Repository: kafka
Updated Branches:
  refs/heads/trunk 55abe65e0 -> 7a84b241e


MINOR: Some cleanups and additional testing for KIP-88

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Vahid Hashemian <vahidhashem...@us.ibm.com>, Ismael Juma 
<ism...@juma.me.uk>

Closes #2383 from hachikuji/minor-cleanup-kip-88


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a84b241
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a84b241
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a84b241

Branch: refs/heads/trunk
Commit: 7a84b241eeb8cb63400a9512b066c3f733f94b8c
Parents: 55abe65
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Jan 17 10:42:05 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Jan 17 10:42:05 2017 -0800

----------------------------------------------------------------------
 .../common/requests/OffsetFetchRequest.java     | 24 +++++-
 .../common/requests/OffsetFetchResponse.java    | 83 ++++++-------------
 .../clients/consumer/KafkaConsumerTest.java     |  2 +-
 .../internals/ConsumerCoordinatorTest.java      |  4 +-
 .../common/requests/RequestResponseTest.java    | 23 ++++--
 .../scala/kafka/api/OffsetFetchRequest.scala    |  1 -
 .../kafka/coordinator/GroupCoordinator.scala    |  4 +-
 .../coordinator/GroupMetadataManager.scala      | 31 ++++---
 .../src/main/scala/kafka/server/KafkaApis.scala | 86 ++++++++------------
 .../kafka/api/AuthorizerIntegrationTest.scala   | 37 ++++++++-
 .../GroupCoordinatorResponseTest.scala          | 58 ++++++++++++-
 11 files changed, 211 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 0ff49be..553fd96 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -78,7 +79,7 @@ public class OffsetFetchRequest extends AbstractRequest {
     private final List<TopicPartition> partitions;
 
     public static OffsetFetchRequest forAllPartitions(String groupId) {
-        return new OffsetFetchRequest.Builder(groupId, (List<TopicPartition>) 
null).setVersion((short) 2).build();
+        return new OffsetFetchRequest.Builder(groupId, 
null).setVersion((short) 2).build();
     }
 
     // v0, v1, and v2 have the same fields.
@@ -131,20 +132,35 @@ public class OffsetFetchRequest extends AbstractRequest {
         groupId = struct.getString(GROUP_ID_KEY_NAME);
     }
 
-    @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
+    public OffsetFetchResponse getErrorResponse(Errors error) {
         short versionId = version();
+
+        Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responsePartitions = new HashMap<>();
+        if (versionId < 2) {
+            for (TopicPartition partition : this.partitions) {
+                responsePartitions.put(partition, new 
OffsetFetchResponse.PartitionData(
+                        OffsetFetchResponse.INVALID_OFFSET,
+                        OffsetFetchResponse.NO_METADATA,
+                        error));
+            }
+        }
+
         switch (versionId) {
             case 0:
             case 1:
             case 2:
-                return new OffsetFetchResponse(Errors.forException(e), 
partitions, versionId);
+                return new OffsetFetchResponse(error, responsePartitions, 
versionId);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)));
         }
     }
 
+    @Override
+    public OffsetFetchResponse getErrorResponse(Throwable e) {
+        return getErrorResponse(Errors.forException(e));
+    }
+
     public String groupId() {
         return groupId;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 0095f38..9c14155 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -46,6 +46,8 @@ public class OffsetFetchResponse extends AbstractResponse {
 
     public static final long INVALID_OFFSET = -1L;
     public static final String NO_METADATA = "";
+    public static final PartitionData UNKNOWN_PARTITION = new 
PartitionData(INVALID_OFFSET, NO_METADATA,
+            Errors.UNKNOWN_TOPIC_OR_PARTITION);
 
     /**
      * Possible error codes:
@@ -59,7 +61,7 @@ public class OffsetFetchResponse extends AbstractResponse {
      *   - GROUP_AUTHORIZATION_FAILED (30)
      */
 
-    public static final List<Errors> PARTITION_ERRORS = Arrays.asList(
+    private static final List<Errors> PARTITION_ERRORS = Arrays.asList(
             Errors.UNKNOWN_TOPIC_OR_PARTITION,
             Errors.TOPIC_AUTHORIZATION_FAILED);
 
@@ -82,14 +84,30 @@ public class OffsetFetchResponse extends AbstractResponse {
         }
     }
 
-    private List<Struct> getTopicArray(Map<TopicPartition, PartitionData> 
responseData) {
-        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
+    /**
+     * Constructor for the latest version.
+     * @param error Potential coordinator or group level error code
+     * @param responseData Fetched offset information grouped by 
topic-partition
+     */
+    public OffsetFetchResponse(Errors error, Map<TopicPartition, 
PartitionData> responseData) {
+        this(error, responseData, CURRENT_VERSION);
+    }
 
-        List<Struct> topicArray = new ArrayList<Struct>();
+    /**
+     * Unified constructor for all versions.
+     * @param error Potential coordinator or group level error code (for api 
version 2 and later)
+     * @param responseData Fetched offset information grouped by 
topic-partition
+     * @param version The request API version
+     */
+    public OffsetFetchResponse(Errors error, Map<TopicPartition, 
PartitionData> responseData, int version) {
+        super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, 
version)));
+
+        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
+        List<Struct> topicArray = new ArrayList<>();
         for (Map.Entry<String, Map<Integer, PartitionData>> entries : 
topicsData.entrySet()) {
             Struct topicData = this.struct.instance(RESPONSES_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
+            List<Struct> partitionArray = new ArrayList<>();
             for (Map.Entry<Integer, PartitionData> partitionEntry : 
entries.getValue().entrySet()) {
                 PartitionData fetchPartitionData = partitionEntry.getValue();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
@@ -103,66 +121,17 @@ public class OffsetFetchResponse extends AbstractResponse 
{
             topicArray.add(topicData);
         }
 
-        return topicArray;
-    }
-
-    /**
-     * Unified constructor
-     * @param responseData Fetched offset information grouped by 
topic-partition
-     * @param topLevelErrorCode Potential coordinator or group level error 
code (for api version 2 and later)
-     * @param version The request API version
-     */
-    public OffsetFetchResponse(Errors topLevelError, Map<TopicPartition, 
PartitionData> responseData, int version) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, 
version)));
-
-        this.struct.set(RESPONSES_KEY_NAME, 
getTopicArray(responseData).toArray());
+        this.struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
         this.responseData = responseData;
-        this.error = topLevelError;
+        this.error = error;
         if (version > 1)
             this.struct.set(ERROR_CODE_KEY_NAME, this.error.code());
     }
 
-    /**
-     * Unified constructor (used only if there are errors in the response)
-     * @param partitions partitions to be included in the response
-     * @param topLevelErrorCode The error code to be reported in the response
-     * @param version The request API version
-     */
-    public OffsetFetchResponse(Errors topLevelError, List<TopicPartition> 
partitions, int version) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, 
version)));
-
-        assert topLevelError != Errors.NONE;
-        this.responseData = new HashMap<>();
-        this.error = topLevelError;
-        if (version < 2) {
-            for (TopicPartition partition : partitions) {
-                this.responseData.put(partition, new 
OffsetFetchResponse.PartitionData(
-                        OffsetFetchResponse.INVALID_OFFSET,
-                        OffsetFetchResponse.NO_METADATA,
-                        topLevelError));
-            }
-        } else
-            this.struct.set(ERROR_CODE_KEY_NAME, this.error.code());
-
-        this.struct.set(RESPONSES_KEY_NAME, 
getTopicArray(this.responseData).toArray());
-    }
-
-    public OffsetFetchResponse(Map<TopicPartition, PartitionData> 
responseData) {
-        this(Errors.NONE, responseData, CURRENT_VERSION);
-    }
-
-    /**
-     * Constructor for version 2 and above when there is a coordinator or 
group level error
-     * @param topLevelErrorCode Coordinator or group level error code
-     */
-    public OffsetFetchResponse(Errors topLevelError) {
-        this(topLevelError, new ArrayList<TopicPartition>(), CURRENT_VERSION);
-    }
-
     public OffsetFetchResponse(Struct struct) {
         super(struct);
         Errors topLevelError = Errors.NONE;
-        this.responseData = new HashMap<TopicPartition, PartitionData>();
+        this.responseData = new HashMap<>();
         for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
             Struct topicResponse = (Struct) topicResponseObj;
             String topic = topicResponse.getString(TOPIC_KEY_NAME);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2eeed55..d4913df 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1370,7 +1370,7 @@ public class KafkaConsumerTest {
         for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
             partitionData.put(entry.getKey(), new 
OffsetFetchResponse.PartitionData(entry.getValue(), "", error));
         }
-        return new OffsetFetchResponse(partitionData);
+        return new OffsetFetchResponse(Errors.NONE, partitionData);
     }
 
     private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> 
offsets, short error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9a8c0b9..ee6afe1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1482,12 +1482,12 @@ public class ConsumerCoordinatorTest {
     }
 
     private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
-        return new OffsetFetchResponse(topLevelError);
+        return new OffsetFetchResponse(topLevelError, 
Collections.<TopicPartition, OffsetFetchResponse.PartitionData>emptyMap());
     }
 
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors 
partitionLevelError, String metadata, long offset) {
         OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, metadata, partitionLevelError);
-        return new OffsetFetchResponse(Collections.singletonMap(tp, data));
+        return new OffsetFetchResponse(Errors.NONE, 
Collections.singletonMap(tp, data));
     }
 
     private OffsetCommitCallback callback(final AtomicBoolean success) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0d3a1a8..a5ed806 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -90,8 +91,13 @@ public class RequestResponseTest {
         checkSerialization(createOffsetCommitResponse(), null);
         checkSerialization(OffsetFetchRequest.forAllPartitions("group1"));
         
checkSerialization(OffsetFetchRequest.forAllPartitions("group1").getErrorResponse(new
 NotCoordinatorForGroupException()), 2);
-        checkSerialization(createOffsetFetchRequest());
-        checkSerialization(createOffsetFetchRequest().getErrorResponse(new 
UnknownServerException()), null);
+        checkSerialization(createOffsetFetchRequest(0));
+        checkSerialization(createOffsetFetchRequest(1));
+        checkSerialization(createOffsetFetchRequest(2));
+        checkSerialization(OffsetFetchRequest.forAllPartitions("group1"));
+        checkSerialization(createOffsetFetchRequest(0).getErrorResponse(new 
UnknownServerException()), 0);
+        checkSerialization(createOffsetFetchRequest(1).getErrorResponse(new 
UnknownServerException()), 1);
+        checkSerialization(createOffsetFetchRequest(2).getErrorResponse(new 
UnknownServerException()), 2);
         checkSerialization(createOffsetFetchResponse(), null);
         checkSerialization(createProduceRequest());
         checkSerialization(createProduceRequest().getErrorResponse(new 
UnknownServerException()), null);
@@ -337,7 +343,7 @@ public class RequestResponseTest {
     }
 
     private DescribeGroupsRequest createDescribeGroupRequest() {
-        return new 
DescribeGroupsRequest.Builder(Collections.singletonList("test-group")).build();
+        return new 
DescribeGroupsRequest.Builder(singletonList("test-group")).build();
     }
 
     private DescribeGroupsResponse createDescribeGroupResponse() {
@@ -428,16 +434,17 @@ public class RequestResponseTest {
         return new OffsetCommitResponse(responseData);
     }
 
-    private OffsetFetchRequest createOffsetFetchRequest() {
-        return new OffsetFetchRequest.Builder("group1",
-                Arrays.asList(new TopicPartition("test11", 1))).build();
+    private OffsetFetchRequest createOffsetFetchRequest(int version) {
+        return new OffsetFetchRequest.Builder("group1", singletonList(new 
TopicPartition("test11", 1)))
+                .setVersion((short) version)
+                .build();
     }
 
     private OffsetFetchResponse createOffsetFetchResponse() {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = 
new HashMap<>();
         responseData.put(new TopicPartition("test", 0), new 
OffsetFetchResponse.PartitionData(100L, "", Errors.NONE));
         responseData.put(new TopicPartition("test", 1), new 
OffsetFetchResponse.PartitionData(100L, null, Errors.NONE));
-        return new OffsetFetchResponse(responseData);
+        return new OffsetFetchResponse(Errors.NONE, responseData);
     }
 
     private ProduceRequest createProduceRequest() {
@@ -544,7 +551,7 @@ public class RequestResponseTest {
     }
 
     private SaslHandshakeResponse createSaslHandshakeResponse() {
-        return new SaslHandshakeResponse(Errors.NONE.code(), 
Collections.singletonList("GSSAPI"));
+        return new SaslHandshakeResponse(Errors.NONE.code(), 
singletonList("GSSAPI"));
     }
 
     private ApiVersionsRequest createApiVersionRequest() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 2908901..dac4cc5 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -25,7 +25,6 @@ import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.OffsetFetchResponse.PARTITION_ERRORS
 
 object OffsetFetchRequest extends Logging {
   val CurrentVersion: Short = 2

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 4cbfad6..7abbc6e 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -422,7 +422,7 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def doCommitOffsets(group: GroupMetadata,
+  private def doCommitOffsets(group: GroupMetadata,
                       memberId: String,
                       generationId: Int,
                       offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
@@ -455,7 +455,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleFetchOffsets(groupId: String,
-                         partitions: Option[Seq[TopicPartition]]): (Errors, 
Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
+                         partitions: Option[Seq[TopicPartition]] = None): 
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
     if (!isActive.get)
       (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Map())
     else if (!isCoordinatorForGroup(groupId)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 74b46ad..2d6889c 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -97,7 +97,7 @@ class GroupMetadataManager(val brokerId: Int,
       unit = TimeUnit.MILLISECONDS)
   }
 
-  def currentGroups(): Iterable[GroupMetadata] = groupMetadataCache.values
+  def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
 
   def isPartitionOwned(partition: Int) = inLock(partitionLock) { 
ownedPartitions.contains(partition) }
 
@@ -342,19 +342,24 @@ class GroupMetadataManager(val brokerId: Int,
             (topicPartition, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", 
Errors.NONE))
           }.toMap
         } else {
-            if (topicPartitionsOpt.isEmpty) {
-              // Return offsets for all partitions owned by this consumer 
group. (this only applies to consumers that commit offsets to Kafka.)
-              group.allOffsets.map { case (topicPartition, offsetAndMetadata) 
=>
-                (topicPartition, new 
OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, 
offsetAndMetadata.metadata, Errors.NONE))
-              }
-            } else {
-              topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
-                group.offset(topicPartition) match {
-                  case None => (topicPartition, new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", 
Errors.NONE))
-                  case Some(offsetAndMetadata) =>
-                    (topicPartition, new 
OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, 
offsetAndMetadata.metadata, Errors.NONE))
+            topicPartitionsOpt match {
+              case None =>
+                // Return offsets for all partitions owned by this consumer 
group. (this only applies to consumers
+                // that commit offsets to Kafka.)
+                group.allOffsets.map { case (topicPartition, 
offsetAndMetadata) =>
+                  topicPartition -> new 
OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, 
offsetAndMetadata.metadata, Errors.NONE)
                 }
-              }.toMap
+
+              case Some(topicPartitions) =>
+                topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { 
topicPartition =>
+                  val partitionData = group.offset(topicPartition) match {
+                    case None =>
+                      new 
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", 
Errors.NONE)
+                    case Some(offsetAndMetadata) =>
+                      new 
OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, 
offsetAndMetadata.metadata, Errors.NONE)
+                  }
+                  topicPartition -> partitionData
+                }.toMap
             }
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index a6ad7b2..530dafc 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -38,7 +38,7 @@ import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
NotLeaderForPartitionException, TopicExistsException, 
UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, 
SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
 import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -886,38 +886,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, 
responseBody))
   }
 
-  /*
+  /**
    * Handle an offset fetch request
    */
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val header = request.header
     val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
 
+    def authorizeTopicDescribe(partition: TopicPartition) =
+      authorize(request.session, Describe, new Resource(auth.Topic, 
partition.topic))
+
     val offsetFetchResponse =
       // reject the request if not authorized to the group
       if (!authorize(request.session, Read, new Resource(Group, 
offsetFetchRequest.groupId)))
-        new OffsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED, 
offsetFetchRequest.partitions, header.apiVersion)
+        offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
       else {
-        val partitions =
-          if (offsetFetchRequest.isAllPartitions)
-            List[TopicPartition]()
-          else
-            offsetFetchRequest.partitions.asScala.toList
-
-        val (authorizedPartitions, unauthorizedPartitions) =
-          partitions.partition { partition => authorize(request.session, 
Describe, new Resource(auth.Topic, partition.topic)) }
-
-        val unknownTopicPartitionResponse = new 
OffsetFetchResponse.PartitionData(
-            OffsetFetchResponse.INVALID_OFFSET, 
OffsetFetchResponse.NO_METADATA, Errors.UNKNOWN_TOPIC_OR_PARTITION)
-        val unauthorizedStatus = unauthorizedPartitions.map(topicPartition => 
(topicPartition, unknownTopicPartitionResponse)).toMap
-
         if (header.apiVersion == 0) {
+          val (authorizedPartitions, unauthorizedPartitions) = 
offsetFetchRequest.partitions.asScala
+            .partition(authorizeTopicDescribe)
+
           // version 0 reads offsets from ZK
-          val responseInfo = authorizedPartitions.map { topicPartition =>
+          val authorizedPartitionData = authorizedPartitions.map { 
topicPartition =>
             val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, 
topicPartition.topic)
             try {
               if (!metadataCache.contains(topicPartition.topic))
-                (topicPartition, unknownTopicPartitionResponse)
+                (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
               else {
                 val payloadOpt = 
zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
                 payloadOpt match {
@@ -925,7 +918,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                     (topicPartition, new OffsetFetchResponse.PartitionData(
                         payload.toLong, OffsetFetchResponse.NO_METADATA, 
Errors.NONE))
                   case None =>
-                    (topicPartition, unknownTopicPartitionResponse)
+                    (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
                 }
               }
             } catch {
@@ -934,43 +927,32 @@ class KafkaApis(val requestChannel: RequestChannel,
                     OffsetFetchResponse.INVALID_OFFSET, 
OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
             }
           }.toMap
-          new OffsetFetchResponse(Errors.NONE, (responseInfo ++ 
unauthorizedStatus).asJava, header.apiVersion)
-        }
-        else {
+
+          val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> 
OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+          new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ 
unauthorizedPartitionData).asJava, header.apiVersion)
+        } else {
           // versions 1 and above read offsets from Kafka
-          val offsets = 
coordinator.handleFetchOffsets(offsetFetchRequest.groupId,
-            if (offsetFetchRequest.isAllPartitions)
-              None
-            else
+          if (offsetFetchRequest.isAllPartitions) {
+            val (error, allPartitionData) = 
coordinator.handleFetchOffsets(offsetFetchRequest.groupId)
+            if (error != Errors.NONE)
+              offsetFetchRequest.getErrorResponse(error)
+            else {
+              // clients are not allowed to see offsets for topics that are 
not authorized for Describe
+              val authorizedPartitionData = allPartitionData.filter { case 
(topicPartition, _) => authorizeTopicDescribe(topicPartition) }
+              new OffsetFetchResponse(Errors.NONE, 
authorizedPartitionData.asJava, header.apiVersion)
+            }
+          } else {
+            val (authorizedPartitions, unauthorizedPartitions) = 
offsetFetchRequest.partitions.asScala
+              .partition(authorizeTopicDescribe)
+            val (error, authorizedPartitionData) = 
coordinator.handleFetchOffsets(offsetFetchRequest.groupId,
               Some(authorizedPartitions))
-
-          // Note that we do not need to filter the partitions in the
-          // metadata cache as the topic partitions will be filtered
-          // in coordinator's offset manager through the offset cache
-          if (header.apiVersion == 1) {
-            val authorizedStatus =
-              if (offsets._1 != Errors.NONE) {
-                authorizedPartitions.map { partition =>
-                  (partition, new OffsetFetchResponse.PartitionData(
-                      OffsetFetchResponse.INVALID_OFFSET, 
OffsetFetchResponse.NO_METADATA, offsets._1))}.toMap
-              }
-              else
-                offsets._2.toMap
-            new OffsetFetchResponse(Errors.NONE, (authorizedStatus ++ 
unauthorizedStatus).asJava, header.apiVersion)
-          }
-          else if (offsets._1 == Errors.NONE) {
-            if (offsetFetchRequest.isAllPartitions) {
-              // filter out unauthorized topics in case all group offsets are 
requested
-              val authorizedStatus = offsets._2.filter {
-                case (partition, _) => authorize(request.session, Describe, 
new Resource(auth.Topic, partition.topic))
-              }
-              new OffsetFetchResponse((authorizedStatus).asJava)
+            if (error != Errors.NONE)
+              offsetFetchRequest.getErrorResponse(error)
+            else {
+              val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> 
OffsetFetchResponse.UNKNOWN_PARTITION).toMap
+              new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ 
unauthorizedPartitionData).asJava, header.apiVersion)
             }
-            else
-              new OffsetFetchResponse((offsets._2.toMap ++ 
unauthorizedStatus).asJava)
           }
-          else
-            new OffsetFetchResponse(offsets._1)
         }
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 42251fa..d43d1af 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -40,6 +40,7 @@ import scala.collection.mutable
 import scala.collection.mutable.Buffer
 import org.apache.kafka.common.KafkaException
 import kafka.admin.AdminUtils
+import kafka.network.SocketServer
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.MemoryRecords
 
@@ -173,7 +174,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     producers.foreach(_.close())
     consumers.foreach(_.wakeup())
     consumers.foreach(_.close())
-    removeAllAcls
+    removeAllAcls()
     super.tearDown()
   }
 
@@ -712,6 +713,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def testFetchAllOffsetsTopicAuthorization() {
+    val offset = 15L
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.commitSync(Map(tp -> new 
OffsetAndMetadata(offset)).asJava)
+
+    removeAllAcls()
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
+
+    // send offset fetch requests directly since the consumer does not expose 
an API to do so
+    // note there's only one broker, so no need to lookup the group coordinator
+
+    // without describe permission on the topic, we shouldn't be able to fetch 
offsets
+    val offsetFetchRequest = 
requests.OffsetFetchRequest.forAllPartitions(group)
+    var offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, 
anySocketServer)
+    assertEquals(Errors.NONE, offsetFetchResponse.error)
+    assertTrue(offsetFetchResponse.responseData.isEmpty)
+
+    // now add describe permission on the topic and verify that the offset can 
be fetched
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
+    offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, 
anySocketServer)
+    assertEquals(Errors.NONE, offsetFetchResponse.error)
+    assertTrue(offsetFetchResponse.responseData.containsKey(tp))
+    assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset)
+  }
+
+  @Test
   def testOffsetFetchTopicDescribe() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
@@ -846,4 +875,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     }
   }
 
+  private def sendOffsetFetchRequest(request: requests.OffsetFetchRequest,
+                                     socketServer: SocketServer): 
requests.OffsetFetchResponse = {
+    val response = send(request, ApiKeys.OFFSET_FETCH, socketServer)
+    requests.OffsetFetchResponse.parse(response, request.version)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 20e512f..d3de16d 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -25,7 +25,7 @@ import kafka.server.{DelayedOperationPurgatory, KafkaConfig, 
ReplicaManager}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest}
+import org.apache.kafka.common.requests.{JoinGroupRequest, 
OffsetCommitRequest, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.{After, Before, Test}
@@ -749,6 +749,62 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testFetchOffsets() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchOffsetForUnknownPartition(): Unit = {
+    val tp = new TopicPartition("topic", 0)
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchOffsetNotCoordinatorForGroup(): Unit = {
+    val tp = new TopicPartition("topic", 0)
+    val (error, partitionData) = 
groupCoordinator.handleFetchOffsets(otherGroupId, Some(Seq(tp)))
+    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+    assertTrue(partitionData.isEmpty)
+  }
+
+  @Test
+  def testFetchAllOffsets() {
+    val tp1 = new TopicPartition("topic", 0)
+    val tp2 = new TopicPartition("topic", 1)
+    val tp3 = new TopicPartition("other-topic", 0)
+    val offset1 = OffsetAndMetadata(15)
+    val offset2 = OffsetAndMetadata(16)
+    val offset3 = OffsetAndMetadata(17)
+
+    assertEquals((Errors.NONE, Map.empty), 
groupCoordinator.handleFetchOffsets(groupId))
+
+    val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, 
tp2 -> offset2, tp3 -> offset3))
+    assertEquals(Errors.NONE.code, commitOffsetResult(tp1))
+    assertEquals(Errors.NONE.code, commitOffsetResult(tp2))
+    assertEquals(Errors.NONE.code, commitOffsetResult(tp3))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(3, partitionData.size)
+    assertTrue(partitionData.forall(_._2.error == Errors.NONE))
+    assertEquals(Some(offset1.offset), partitionData.get(tp1).map(_.offset))
+    assertEquals(Some(offset2.offset), partitionData.get(tp2).map(_.offset))
+    assertEquals(Some(offset3.offset), partitionData.get(tp3).map(_.offset))
+  }
+
+  @Test
   def testCommitOffsetInAwaitingSync() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val tp = new TopicPartition("topic", 0)

Reply via email to