[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
tombentley commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r646585310 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java ## @@ -40,7 +43,11 @@ * individual deletions. */ public Map> deletedGroups() { -return futures; +Map> deletedGroups = new HashMap<>(); Review comment: ```suggestion Map> deletedGroups = new HashMap<>(this.futures.size()); ``` ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java ## @@ -24,7 +24,7 @@ public final String idValue; public final FindCoordinatorRequest.CoordinatorType type; -private CoordinatorKey(String idValue, FindCoordinatorRequest.CoordinatorType type) { +public CoordinatorKey(String idValue, FindCoordinatorRequest.CoordinatorType type) { Review comment: To me it would be more natural to specify the `type` first then the `idValue`. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.MemberAssignment; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.message.DescribeGroupsRequestData; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; + +public class DescribeConsumerGroupsHandler implements AdminApiHandler { + +private final boolean includeAuthorizedOperations; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public DescribeConsumerGroupsHandler( +Set groupIds, Review comment: Unused ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java ## @@ -267,7 +278,11 @@ public void onFailure( private void clearInflightRequest(long currentTimeMs, RequestSpec spec) { RequestState requestState = requestStates.get(spec.scope); if (requestState != null) { -requestState.clearInflight(currentTimeMs); +if (spec.scope instanceof FulfillmentScope) { +requestState.clearInflight(currentTimeMs + retryBackoffMs); +} else { +requestState.clearInflight(currentTimeMs); Review comment: Why the differing behaviour depending on the scope? There's no need to back off on find coordinator requests? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java ## @@ -63,7 +64,7 @@ * * @return result indicating key completion, failure, and unmapping */ -ApiResult handleResponse(int brokerId, Set keys, AbstractResponse
[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
tombentley commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r648530700 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1305,66 +1305,102 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = { +val version = request.header.apiVersion +if (version < 4) { + handleFindCoordinatorRequestLessThanV4(request) +} else { + handleFindCoordinatorRequestV4AndAbove(request) Review comment: Yes, obvious now you point it out! I guess I can't see a better way of abstracting over that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
tombentley commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r649872433 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -858,6 +885,12 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { public void onFailure(RuntimeException e, RequestFuture future) { log.debug("FindCoordinator request failed due to {}", e.toString()); +if (e instanceof UnsupportedBatchLookupException) { Review comment: This `AdminApiDriver` abstraction is pretty new to me, so I might be wide of the mark, but it doesn't appear to handle this very well. The lookup strategy has to build a request without knowing either the broker or the API versions. It would be possible to pass the `ApiVersions` to the `CoordinatorStrategy`, which should let you do the right thing based on the minimum of the FindCoordinator API version supported in the whole cluster. (That's not completely perfect, since really you'd want to decide on a per-broker basis, but I think it would be good enough). Sadly it's not quite enough to pass just `ApiVersions`, since it doesn't really know about the nodes in the cluster, so you'd need to pass `Metadata` too, which is quite a lot of work. So I can understand why it makes sense to do it like this, since it lets you benefit from the existing logic for figuring out request versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
tombentley commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r654335077 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; +import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +public class AlterConsumerGroupOffsetsHandler implements AdminApiHandler> { + +private final CoordinatorKey groupId; +private final Map offsets; +private final Logger log; +private final AdminApiLookupStrategy lookupStrategy; + +public AlterConsumerGroupOffsetsHandler( +String groupId, +Map offsets, +LogContext logContext +) { +this.groupId = CoordinatorKey.byGroupId(groupId); +this.offsets = offsets; +this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class); +this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); +} + +@Override +public String apiName() { +return "offsetCommit"; +} + +@Override +public AdminApiLookupStrategy lookupStrategy() { +return lookupStrategy; +} + +public static AdminApiFuture.SimpleAdminApiFuture> newFuture( +String groupId +) { +return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId))); +} + +@Override +public OffsetCommitRequest.Builder buildRequest(int brokerId, Set keys) { Review comment: brokerId -> coordinatorId, to emphasize the point that we've found the coordinators at this point? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.me
[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
tombentley commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r657570794 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java ## @@ -29,9 +32,9 @@ */ @InterfaceStability.Evolving public class DeleteConsumerGroupsResult { -private final Map> futures; +private final Map> futures; -DeleteConsumerGroupsResult(final Map> futures) { +DeleteConsumerGroupsResult(Map> futures) { Review comment: @skaundinya15 those methods in `KafkaFuture` are intentionally not `public` because a user receiving an instance should _never_ need to complete the future (they're always completed by the admin client). Making them `public` would thus make the API less type safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
tombentley commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r658507802 ## File path: clients/src/main/java/org/apache/kafka/common/errors/NoBatchedFindCoordinatorsException.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * Indicates that it is not possible to lookup coordinators in batches with FindCoordinator. Instead + * coordinators must be looked up one by one. + */ +public class NoBatchedFindCoordinatorsException extends UnsupportedVersionException { Review comment: It seems like a mistake to add this to the public `errors` package when it should never propagate outside the client. An alternative could to be make it a static member class of `FindCoordinatorRequest`, or put it in some other package. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org