This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 16ad358  KAFKA-6868; Fix buffer underflow and expose group state in 
the consumer groups API (#4980)
16ad358 is described below

commit 16ad358d64a138fc4b455379745ae1550a93d57b
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Mon May 21 08:37:35 2018 -0700

    KAFKA-6868; Fix buffer underflow and expose group state in the consumer 
groups API (#4980)
    
    * The consumer groups API should expose group state and coordinator 
information.  This information is needed by administrative tools and scripts 
that access consume groups.
    
    * The partition assignment will be empty when the group is rebalancing. Fix 
an issue where the adminclient attempted to deserialize this empty buffer.
    
    * Remove nulls from the API and make all collections immutable.
    
    * DescribeConsumerGroupsResult#all should return a result as expected, 
rather than Void
    
    * Fix exception text for GroupIdNotFoundException, GroupNotEmptyException. 
It was being filled in as "The group id The group id does not exist was not 
found" and similar.
    
    Reviewers: Attila Sasvari <asasv...@apache.org>, Andras Beni 
<andrasb...@cloudera.com>, Dong Lin <lindon...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>
---
 .../clients/admin/ConsumerGroupDescription.java    |  78 +++++++++-----
 .../admin/DescribeConsumerGroupsResult.java        |  26 ++++-
 .../kafka/clients/admin/KafkaAdminClient.java      |  51 +++++----
 .../kafka/clients/admin/MemberAssignment.java      |  13 ++-
 .../kafka/clients/admin/MemberDescription.java     |  45 ++++----
 .../apache/kafka/common/ConsumerGroupState.java    |  61 +++++++++++
 .../common/errors/GroupIdNotFoundException.java    |  12 +--
 .../common/errors/GroupNotEmptyException.java      |  12 +--
 .../main/scala/kafka/tools/StreamsResetter.java    |   3 +-
 .../kafka/api/AdminClientIntegrationTest.scala     | 120 ++++++++++++++++++++-
 10 files changed, 315 insertions(+), 106 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 0bfa8a7..bc3857d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -17,55 +17,56 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
 
 /**
  * A detailed description of a single consumer group in the cluster.
  */
 public class ConsumerGroupDescription {
-
     private final String groupId;
     private final boolean isSimpleConsumerGroup;
-    private final List<MemberDescription> members;
+    private final Collection<MemberDescription> members;
     private final String partitionAssignor;
+    private final ConsumerGroupState state;
+    private final Node coordinator;
 
-    /**
-     * Creates an instance with the specified parameters.
-     *
-     * @param groupId               The consumer group id
-     * @param isSimpleConsumerGroup If Consumer Group is simple
-     * @param members               The consumer group members
-     * @param partitionAssignor     The consumer group partition assignor
-     */
-    public ConsumerGroupDescription(String groupId, boolean 
isSimpleConsumerGroup, List<MemberDescription> members, String 
partitionAssignor) {
-        this.groupId = groupId;
+    ConsumerGroupDescription(String groupId,
+            boolean isSimpleConsumerGroup,
+            Collection<MemberDescription> members,
+            String partitionAssignor,
+            ConsumerGroupState state,
+            Node coordinator) {
+        this.groupId = groupId == null ? "" : groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
-        this.members = members;
-        this.partitionAssignor = partitionAssignor;
+        this.members = members == null ? 
Collections.<MemberDescription>emptyList() :
+            Collections.unmodifiableList(new ArrayList<>(members));
+        this.partitionAssignor = partitionAssignor == null ? "" : 
partitionAssignor;
+        this.state = state;
+        this.coordinator = coordinator;
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-
         ConsumerGroupDescription that = (ConsumerGroupDescription) o;
-
-        if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false;
-        if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != 
null) return false;
-        if (members != null ? !members.equals(that.members) : that.members != 
null) return false;
-        return partitionAssignor != null ? 
partitionAssignor.equals(that.partitionAssignor) : that.partitionAssignor == 
null;
+        return isSimpleConsumerGroup == that.isSimpleConsumerGroup &&
+            groupId.equals(that.groupId) &&
+            members.equals(that.members) &&
+            partitionAssignor.equals(that.partitionAssignor) &&
+            state.equals(that.state);
     }
 
     @Override
     public int hashCode() {
-        int result = groupId != null ? groupId.hashCode() : 0;
-        result = 31 * result + (isSimpleConsumerGroup ? 1 : 0);
-        result = 31 * result + (members != null ? members.hashCode() : 0);
-        result = 31 * result + (partitionAssignor != null ? 
partitionAssignor.hashCode() : 0);
-        return result;
+        return Objects.hash(isSimpleConsumerGroup, groupId, members, 
partitionAssignor, state);
     }
 
     /**
@@ -85,7 +86,7 @@ public class ConsumerGroupDescription {
     /**
      * A list of the members of the consumer group.
      */
-    public List<MemberDescription> members() {
+    public Collection<MemberDescription> members() {
         return members;
     }
 
@@ -96,9 +97,28 @@ public class ConsumerGroupDescription {
         return partitionAssignor;
     }
 
+    /**
+     * The consumer group state, or UNKNOWN if the state is too new for us to 
parse.
+     */
+    public ConsumerGroupState state() {
+        return state;
+    }
+
+    /**
+     * The consumer group coordinator, or null if the coordinator is not known.
+     */
+    public Node coordinator() {
+        return coordinator;
+    }
+
     @Override
     public String toString() {
-        return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + 
isSimpleConsumerGroup + ", members=" +
-            Utils.join(members, ",") + ", partitionAssignor=" + 
partitionAssignor + ")";
+        return "(groupId=" + groupId +
+            ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
+            ", members=" + Utils.join(members, ",") +
+            ", partitionAssignor=" + partitionAssignor +
+            ", state=" + state +
+            ", coordinator=" + coordinator +
+            ")";
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index ac2189c..8f0ebad 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -21,7 +21,9 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 
 /**
@@ -39,16 +41,32 @@ public class DescribeConsumerGroupsResult {
     }
 
     /**
-     * Return a map from group id to futures which can be used to check the 
description of a consumer group.
+     * Return a map from group id to futures which yield group descriptions.
      */
     public Map<String, KafkaFuture<ConsumerGroupDescription>> 
describedGroups() {
         return futures;
     }
 
     /**
-     * Return a future which succeeds only if all the consumer group 
description succeed.
+     * Return a future which yields all ConsumerGroupDescription objects, if 
all the describes succeed.
      */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    public KafkaFuture<Map<String, ConsumerGroupDescription>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+            new KafkaFuture.BaseFunction<Void, Map<String, 
ConsumerGroupDescription>>() {
+                @Override
+                public Map<String, ConsumerGroupDescription> apply(Void v) {
+                    try {
+                        Map<String, ConsumerGroupDescription> descriptions = 
new HashMap<>(futures.size());
+                        for (Map.Entry<String, 
KafkaFuture<ConsumerGroupDescription>> entry : futures.entrySet()) {
+                            descriptions.put(entry.getKey(), 
entry.getValue().get());
+                        }
+                        return descriptions;
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, since the 
KafkaFuture#allOf already ensured
+                        // that all of the futures completed successfully.
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c9e0e18..5f4eefe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
@@ -120,11 +121,9 @@ import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -2347,14 +2346,21 @@ public class KafkaAdminClient extends AdminClient {
 
                 @Override
                 void handleResponse(AbstractResponse abstractResponse) {
-                    final FindCoordinatorResponse response = 
(FindCoordinatorResponse) abstractResponse;
+                    final FindCoordinatorResponse fcResponse = 
(FindCoordinatorResponse) abstractResponse;
+                    Errors error = fcResponse.error();
+                    if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
+                        // Retry COORDINATOR_NOT_AVAILABLE, in case the error 
is temporary.
+                        throw error.exception();
+                    } else if (error != Errors.NONE) {
+                        // All other errors are immediate failures.
+                        KafkaFutureImpl<ConsumerGroupDescription> future = 
futures.get(groupId);
+                        future.completeExceptionally(error.exception());
+                        return;
+                    }
 
                     final long nowDescribeConsumerGroups = time.milliseconds();
-
-                    final int nodeId = response.node().id();
-
+                    final int nodeId = fcResponse.node().id();
                     runnable.call(new Call("describeConsumerGroups", deadline, 
new ConstantNodeIdProvider(nodeId)) {
-
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
                             return new 
DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
@@ -2375,24 +2381,29 @@ public class KafkaAdminClient extends AdminClient {
                                 final String protocolType = 
groupMetadata.protocolType();
                                 if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
                                     final 
List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
-                                    final List<MemberDescription> consumers = 
new ArrayList<>(members.size());
+                                    final List<MemberDescription> 
memberDescriptions = new ArrayList<>(members.size());
 
                                     for (DescribeGroupsResponse.GroupMember 
groupMember : members) {
-                                        final PartitionAssignor.Assignment 
assignment =
-                                                
ConsumerProtocol.deserializeAssignment(
-                                                        
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
-
+                                        Set<TopicPartition> partitions = 
Collections.emptySet();
+                                        if 
(groupMember.memberAssignment().remaining() > 0) {
+                                            final PartitionAssignor.Assignment 
assignment = ConsumerProtocol.
+                                                
deserializeAssignment(groupMember.memberAssignment().duplicate());
+                                            partitions = new 
HashSet<>(assignment.partitions());
+                                        }
                                         final MemberDescription 
memberDescription =
-                                                new MemberDescription(
-                                                        groupMember.memberId(),
-                                                        groupMember.clientId(),
-                                                        
groupMember.clientHost(),
-                                                        new 
MemberAssignment(assignment.partitions()));
-                                        consumers.add(memberDescription);
+                                            new 
MemberDescription(groupMember.memberId(),
+                                                groupMember.clientId(),
+                                                groupMember.clientHost(),
+                                                new 
MemberAssignment(partitions));
+                                        
memberDescriptions.add(memberDescription);
                                     }
-                                    final String protocol = 
groupMetadata.protocol();
                                     final ConsumerGroupDescription 
consumerGroupDescription =
-                                            new 
ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol);
+                                            new 
ConsumerGroupDescription(groupId,
+                                                protocolType.isEmpty(),
+                                                memberDescriptions,
+                                                groupMetadata.protocol(),
+                                                
ConsumerGroupState.parse(groupMetadata.state()),
+                                                fcResponse.node());
                                     future.complete(consumerGroupDescription);
                                 }
                             }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
index bd95813..6c180ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
@@ -19,21 +19,24 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.List;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * A description of the assignments of a specific group member.
  */
 public class MemberAssignment {
-    private final List<TopicPartition> topicPartitions;
+    private final Set<TopicPartition> topicPartitions;
 
     /**
      * Creates an instance with the specified parameters.
      *
      * @param topicPartitions List of topic partitions
      */
-    public MemberAssignment(List<TopicPartition> topicPartitions) {
-        this.topicPartitions = topicPartitions;
+    MemberAssignment(Set<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions == null ? 
Collections.<TopicPartition>emptySet() :
+            Collections.unmodifiableSet(new HashSet<>(topicPartitions));
     }
 
     @Override
@@ -54,7 +57,7 @@ public class MemberAssignment {
     /**
      * The topic partitions assigned to a group member.
      */
-    public List<TopicPartition> topicPartitions() {
+    public Set<TopicPartition> topicPartitions() {
         return topicPartitions;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
index 2ba1963..895abad 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -17,49 +17,42 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Objects;
+
 /**
  * A detailed description of a single group instance in the cluster.
  */
 public class MemberDescription {
-
     private final String memberId;
     private final String clientId;
     private final String host;
     private final MemberAssignment assignment;
 
-    /**
-     * Creates an instance with the specified parameters.
-     *
-     * @param memberId The consumer id
-     * @param clientId   The client id
-     * @param host       The host
-     * @param assignment The assignment
-     */
-    public MemberDescription(String memberId, String clientId, String host, 
MemberAssignment assignment) {
-        this.memberId = memberId;
-        this.clientId = clientId;
-        this.host = host;
-        this.assignment = assignment;
+    MemberDescription(String memberId, String clientId, String host, 
MemberAssignment assignment) {
+        this.memberId = memberId == null ? "" : memberId;
+        this.clientId = clientId == null ? "" : clientId;
+        this.host = host == null ? "" : host;
+        this.assignment = assignment == null ?
+            new MemberAssignment(Collections.<TopicPartition>emptySet()) : 
assignment;
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-
         MemberDescription that = (MemberDescription) o;
-
-        if (memberId != null ? !memberId.equals(that.memberId) : that.memberId 
!= null) return false;
-        if (clientId != null ? !clientId.equals(that.clientId) : that.clientId 
!= null) return false;
-        return assignment != null ? assignment.equals(that.assignment) : 
that.assignment == null;
+        return memberId.equals(that.memberId) &&
+            clientId.equals(that.clientId) &&
+            host.equals(that.host) &&
+            assignment.equals(that.assignment);
     }
 
     @Override
     public int hashCode() {
-        int result = memberId != null ? memberId.hashCode() : 0;
-        result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
-        result = 31 * result + (assignment != null ? assignment.hashCode() : 
0);
-        return result;
+        return Objects.hash(memberId, clientId, host, assignment);
     }
 
     /**
@@ -92,7 +85,9 @@ public class MemberDescription {
 
     @Override
     public String toString() {
-        return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" 
+ host + ", assignment=" +
-            assignment + ")";
+        return "(memberId=" + memberId +
+            ", clientId=" + clientId +
+            ", host=" + host +
+            ", assignment=" + assignment + ")";
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java 
b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
new file mode 100644
index 0000000..7f3d4f0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import java.util.HashMap;
+
+/**
+ * The consumer group state.
+ */
+public enum ConsumerGroupState {
+    UNKNOWN("Unknown"),
+    PREPARING_REBALANCE("PreparingRebalance"),
+    COMPLETING_REBALANCE("CompletingRebalance"),
+    STABLE("Stable"),
+    DEAD("Dead"),
+    EMPTY("Empty");
+
+    private final static HashMap<String, ConsumerGroupState> NAME_TO_ENUM;
+
+    static {
+        NAME_TO_ENUM = new HashMap<>();
+        for (ConsumerGroupState state : ConsumerGroupState.values()) {
+            NAME_TO_ENUM.put(state.name, state);
+        }
+    }
+
+    private final String name;
+
+    ConsumerGroupState(String name) {
+        this.name = name;
+    }
+
+
+    /**
+     * Parse a string into a consumer group state.
+     */
+    public static ConsumerGroupState parse(String name) {
+        ConsumerGroupState state = NAME_TO_ENUM.get(name);
+        return state == null ? UNKNOWN : state;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
index 1ff30f1..a4d509d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
@@ -17,15 +17,7 @@
 package org.apache.kafka.common.errors;
 
 public class GroupIdNotFoundException extends ApiException {
-    private final String groupId;
-
-    public GroupIdNotFoundException(String groupId) {
-        super("The group id " + groupId + " was not found");
-        this.groupId = groupId;
-    }
-
-    public String groupId() {
-        return groupId;
+    public GroupIdNotFoundException(String message) {
+        super(message);
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
index 264e613..e15b3e6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
@@ -17,15 +17,7 @@
 package org.apache.kafka.common.errors;
 
 public class GroupNotEmptyException extends ApiException {
-    private final String groupId;
-
-    public GroupNotEmptyException(String groupId) {
-        super("The group " + groupId + " is not empty");
-        this.groupId = groupId;
-    }
-
-    public String groupId() {
-        return groupId;
+    public GroupNotEmptyException(String message) {
+        super(message);
     }
-
 }
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index d7c4e43..3c045c6 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -157,7 +157,8 @@ public class StreamsResetter {
                                            final AdminClient adminClient) 
throws ExecutionException, InterruptedException {
         final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(Arrays.asList(groupId),
                 (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
-        final List<MemberDescription> members = 
describeResult.describedGroups().get(groupId).get().members();
+        final List<MemberDescription> members =
+            new 
ArrayList<MemberDescription>(describeResult.describedGroups().get(groupId).get().members());
         if (!members.isEmpty()) {
             throw new IllegalStateException("Consumer group '" + groupId + "' 
is still active "
                     + "and has following members: " + members + ". "
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index b31c09d..e7dd108 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -31,10 +31,10 @@ import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{KafkaFuture, TopicPartition, 
TopicPartitionReplica}
+import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture, 
TopicPartition, TopicPartitionReplica}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
@@ -49,6 +49,7 @@ import scala.collection.JavaConverters._
 import java.lang.{Long => JLong}
 
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.internals.Topic
 import org.scalatest.Assertions.intercept
 
 import scala.concurrent.duration.Duration
@@ -98,6 +99,7 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
       config.setProperty(KafkaConfig.InterBrokerListenerNameProp, 
listenerName.value)
       config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"${listenerName.value}:${securityProtocol.name}")
       config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+      config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
       // We set this in order to test that we don't expose sensitive data via 
describe configs. This will already be
       // set for subclasses with security enabled and we don't want to 
overwrite it.
       if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
@@ -959,6 +961,120 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     client.close()
     assertEquals(1, factory.failuresInjected)
   }
+
+  /**
+    * Test the consumer group APIs.
+    */
+  @Test
+  def testConsumerGroups(): Unit = {
+    val config = createConfig()
+    val client = AdminClient.create(config)
+    try {
+      // Verify that initially there are no consumer groups to list.
+      val list1 = client.listConsumerGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+      client.createTopics(Collections.singleton(
+        new NewTopic(testTopicName, testNumPartitions, 1))).all().get()
+      val producer = createNewProducer
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+      val newConsumerConfig = new Properties(consumerConfig)
+      newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+      newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+      val consumer = TestUtils.createNewConsumer(brokerList,
+        securityProtocol = this.securityProtocol,
+        trustStoreFile = this.trustStoreFile,
+        saslProperties = this.clientSaslProperties,
+        props = Some(newConsumerConfig))
+      try {
+        // Start a consumer in a thread that will subscribe to a new group.
+        val consumerThread = new Thread {
+          override def run {
+            consumer.subscribe(Collections.singleton(testTopicName))
+            while (true) {
+              consumer.poll(5000)
+              consumer.commitSync()
+            }
+          }
+        }
+        try {
+          consumerThread.start
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listConsumerGroups().all().get().asScala.
+              filter(listing => listing.groupId().equals(testGroupId))
+            !matching.isEmpty
+          }, s"Expected to be able to list $testGroupId")
+
+          val result = client.describeConsumerGroups(Seq(testGroupId, 
fakeGroupId).asJava)
+          assertEquals(2, result.describedGroups().size())
+
+          // Test that we can get information about the test consumer group.
+          assertTrue(result.describedGroups().containsKey(testGroupId))
+          val testGroupDescription = 
result.describedGroups().get(testGroupId).get()
+          assertEquals(testGroupId, testGroupDescription.groupId())
+          assertFalse(testGroupDescription.isSimpleConsumerGroup())
+          assertEquals(1, testGroupDescription.members().size())
+          val member = testGroupDescription.members().iterator().next()
+          assertEquals(testClientId, member.clientId())
+          val topicPartitions = member.assignment().topicPartitions()
+          assertEquals(testNumPartitions, topicPartitions.size())
+          assertEquals(testNumPartitions, topicPartitions.asScala.
+            count(tp => tp.topic().equals(testTopicName)))
+
+          // Test that the fake group is listed as dead.
+          assertTrue(result.describedGroups().containsKey(fakeGroupId))
+          val fakeGroupDescription = 
result.describedGroups().get(fakeGroupId).get()
+          assertEquals(fakeGroupId, fakeGroupDescription.groupId())
+          assertEquals(0, fakeGroupDescription.members().size())
+          assertEquals("", fakeGroupDescription.partitionAssignor())
+          assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
+
+          // Test that all() returns 2 results
+          assertEquals(2, result.all().get().size())
+
+          // Test listConsumerGroupOffsets
+          val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+          TestUtils.waitUntilTrue(() => {
+            val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+            val part = new TopicPartition(testTopicName, 0)
+            parts.containsKey(part) && (parts.get(part).offset() == 1)
+          }, s"Expected the offset for partition 0 to eventually become 1.")
+
+          // Test consumer group deletion
+          val deleteResult = client.deleteConsumerGroups(Seq(testGroupId, 
fakeGroupId).asJava)
+          assertEquals(2, deleteResult.deletedGroups().size())
+
+          // Deleting the fake group ID should get GroupIdNotFoundException.
+          assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
+          
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
+            classOf[GroupIdNotFoundException])
+
+          // Deleting the real group ID should get GroupNotEmptyException
+          assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
+          
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
+            classOf[GroupNotEmptyException])
+        } finally {
+          consumerThread.interrupt()
+          consumerThread.join()
+        }
+      } finally {
+        Utils.closeQuietly(consumer, "consumer")
+      }
+    } finally {
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
 }
 
 object AdminClientIntegrationTest {

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to