[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-07 Thread GitBox


tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r646585310



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
##
@@ -40,7 +43,11 @@
  * individual deletions.
  */
 public Map> deletedGroups() {
-return futures;
+Map> deletedGroups = new HashMap<>();

Review comment:
   ```suggestion
   Map> deletedGroups = new 
HashMap<>(this.futures.size());
   ```

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##
@@ -24,7 +24,7 @@
 public final String idValue;
 public final FindCoordinatorRequest.CoordinatorType type;
 
-private CoordinatorKey(String idValue, 
FindCoordinatorRequest.CoordinatorType type) {
+public CoordinatorKey(String idValue, 
FindCoordinatorRequest.CoordinatorType type) {

Review comment:
   To me it would be more natural to specify the `type` first then the 
`idValue`.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+public class DescribeConsumerGroupsHandler implements 
AdminApiHandler {
+
+private final boolean includeAuthorizedOperations;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public DescribeConsumerGroupsHandler(
+Set groupIds,

Review comment:
   Unused

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##
@@ -267,7 +278,11 @@ public void onFailure(
 private void clearInflightRequest(long currentTimeMs, RequestSpec spec) 
{
 RequestState requestState = requestStates.get(spec.scope);
 if (requestState != null) {
-requestState.clearInflight(currentTimeMs);
+if (spec.scope instanceof FulfillmentScope) {
+requestState.clearInflight(currentTimeMs + retryBackoffMs);
+} else {
+requestState.clearInflight(currentTimeMs);

Review comment:
   Why the differing behaviour depending on the scope? There's no need to 
back off on find coordinator requests?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##
@@ -63,7 +64,7 @@
  *
  * @return result indicating key completion, failure, and unmapping
  */
-ApiResult handleResponse(int brokerId, Set keys, AbstractResponse 

[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-09 Thread GitBox


tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r648530700



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1305,66 +1305,102 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
+val version = request.header.apiVersion
+if (version < 4) {
+  handleFindCoordinatorRequestLessThanV4(request)
+} else {
+  handleFindCoordinatorRequestV4AndAbove(request)

Review comment:
   Yes, obvious now you point it out! I guess I can't see a better way of 
abstracting over that. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-14 Thread GitBox


tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r649872433



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -858,6 +885,12 @@ public void onSuccess(ClientResponse resp, 
RequestFuture future) {
 public void onFailure(RuntimeException e, RequestFuture future) {
 log.debug("FindCoordinator request failed due to {}", 
e.toString());
 
+if (e instanceof UnsupportedBatchLookupException) {

Review comment:
   This `AdminApiDriver` abstraction is pretty new to me, so I might be 
wide of the mark, but it doesn't appear to handle this very well. The lookup 
strategy has to build a request without knowing either the broker or the API 
versions. It would be possible to pass the `ApiVersions` to the 
`CoordinatorStrategy`, which should let you do the right thing based on the 
minimum of the FindCoordinator API version supported in the whole cluster. 
(That's not completely perfect, since really you'd want to decide on a 
per-broker basis, but I think it would be good enough). Sadly it's not quite 
enough to pass just `ApiVersions`, since it doesn't really know about the nodes 
in the cluster, so you'd need to pass `Metadata` too, which is quite a lot of 
work. So I can understand why it makes sense to do it like this, since it lets 
you benefit from the existing logic for figuring out request versions. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-18 Thread GitBox


tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r654335077



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
+import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+public class AlterConsumerGroupOffsetsHandler implements 
AdminApiHandler> {
+
+private final CoordinatorKey groupId;
+private final Map offsets;
+private final Logger log;
+private final AdminApiLookupStrategy lookupStrategy;
+
+public AlterConsumerGroupOffsetsHandler(
+String groupId,
+Map offsets,
+LogContext logContext
+) {
+this.groupId = CoordinatorKey.byGroupId(groupId);
+this.offsets = offsets;
+this.log = logContext.logger(AlterConsumerGroupOffsetsHandler.class);
+this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+}
+
+@Override
+public String apiName() {
+return "offsetCommit";
+}
+
+@Override
+public AdminApiLookupStrategy lookupStrategy() {
+return lookupStrategy;
+}
+
+public static AdminApiFuture.SimpleAdminApiFuture> newFuture(
+String groupId
+) {
+return 
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+}
+
+@Override
+public OffsetCommitRequest.Builder buildRequest(int brokerId, 
Set keys) {

Review comment:
   brokerId -> coordinatorId, to emphasize the point that we've found the 
coordinators at this point?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.me

[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-23 Thread GitBox


tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r657570794



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
##
@@ -29,9 +32,9 @@
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
-private final Map> futures;
+private final Map> futures;
 
-DeleteConsumerGroupsResult(final Map> futures) {
+DeleteConsumerGroupsResult(Map> 
futures) {

Review comment:
   @skaundinya15 those methods in `KafkaFuture` are intentionally not 
`public` because a user receiving an instance should _never_ need to complete 
the future (they're always completed by the admin client). Making them `public` 
would thus make the API less type safe. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time

2021-06-24 Thread GitBox


tombentley commented on a change in pull request #10743:
URL: https://github.com/apache/kafka/pull/10743#discussion_r658507802



##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/NoBatchedFindCoordinatorsException.java
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that it is not possible to lookup coordinators in batches with 
FindCoordinator. Instead
+ * coordinators must be looked up one by one.
+ */
+public class NoBatchedFindCoordinatorsException extends 
UnsupportedVersionException {

Review comment:
   It seems like a mistake to add this to the public `errors` package when 
it should never propagate outside the client. An alternative could to be make 
it a static member class of `FindCoordinatorRequest`, or put it in some other 
package.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org