Repository: kafka Updated Branches: refs/heads/trunk 55abe65e0 -> 7a84b241e
MINOR: Some cleanups and additional testing for KIP-88 Author: Jason Gustafson <ja...@confluent.io> Reviewers: Vahid Hashemian <vahidhashem...@us.ibm.com>, Ismael Juma <ism...@juma.me.uk> Closes #2383 from hachikuji/minor-cleanup-kip-88 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7a84b241 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7a84b241 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7a84b241 Branch: refs/heads/trunk Commit: 7a84b241eeb8cb63400a9512b066c3f733f94b8c Parents: 55abe65 Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Jan 17 10:42:05 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Jan 17 10:42:05 2017 -0800 ---------------------------------------------------------------------- .../common/requests/OffsetFetchRequest.java | 24 +++++- .../common/requests/OffsetFetchResponse.java | 83 ++++++------------- .../clients/consumer/KafkaConsumerTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 4 +- .../common/requests/RequestResponseTest.java | 23 ++++-- .../scala/kafka/api/OffsetFetchRequest.scala | 1 - .../kafka/coordinator/GroupCoordinator.scala | 4 +- .../coordinator/GroupMetadataManager.scala | 31 ++++--- .../src/main/scala/kafka/server/KafkaApis.scala | 86 ++++++++------------ .../kafka/api/AuthorizerIntegrationTest.scala | 37 ++++++++- .../GroupCoordinatorResponseTest.scala | 58 ++++++++++++- 11 files changed, 211 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 0ff49be..553fd96 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,7 +79,7 @@ public class OffsetFetchRequest extends AbstractRequest { private final List<TopicPartition> partitions; public static OffsetFetchRequest forAllPartitions(String groupId) { - return new OffsetFetchRequest.Builder(groupId, (List<TopicPartition>) null).setVersion((short) 2).build(); + return new OffsetFetchRequest.Builder(groupId, null).setVersion((short) 2).build(); } // v0, v1, and v2 have the same fields. @@ -131,20 +132,35 @@ public class OffsetFetchRequest extends AbstractRequest { groupId = struct.getString(GROUP_ID_KEY_NAME); } - @Override - public AbstractResponse getErrorResponse(Throwable e) { + public OffsetFetchResponse getErrorResponse(Errors error) { short versionId = version(); + + Map<TopicPartition, OffsetFetchResponse.PartitionData> responsePartitions = new HashMap<>(); + if (versionId < 2) { + for (TopicPartition partition : this.partitions) { + responsePartitions.put(partition, new OffsetFetchResponse.PartitionData( + OffsetFetchResponse.INVALID_OFFSET, + OffsetFetchResponse.NO_METADATA, + error)); + } + } + switch (versionId) { case 0: case 1: case 2: - return new OffsetFetchResponse(Errors.forException(e), partitions, versionId); + return new OffsetFetchResponse(error, responsePartitions, versionId); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id))); } } + @Override + public OffsetFetchResponse getErrorResponse(Throwable e) { + return getErrorResponse(Errors.forException(e)); + } + public String groupId() { return groupId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 0095f38..9c14155 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -46,6 +46,8 @@ public class OffsetFetchResponse extends AbstractResponse { public static final long INVALID_OFFSET = -1L; public static final String NO_METADATA = ""; + public static final PartitionData UNKNOWN_PARTITION = new PartitionData(INVALID_OFFSET, NO_METADATA, + Errors.UNKNOWN_TOPIC_OR_PARTITION); /** * Possible error codes: @@ -59,7 +61,7 @@ public class OffsetFetchResponse extends AbstractResponse { * - GROUP_AUTHORIZATION_FAILED (30) */ - public static final List<Errors> PARTITION_ERRORS = Arrays.asList( + private static final List<Errors> PARTITION_ERRORS = Arrays.asList( Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.TOPIC_AUTHORIZATION_FAILED); @@ -82,14 +84,30 @@ public class OffsetFetchResponse extends AbstractResponse { } } - private List<Struct> getTopicArray(Map<TopicPartition, PartitionData> responseData) { - Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData); + /** + * Constructor for the latest version. + * @param error Potential coordinator or group level error code + * @param responseData Fetched offset information grouped by topic-partition + */ + public OffsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) { + this(error, responseData, CURRENT_VERSION); + } - List<Struct> topicArray = new ArrayList<Struct>(); + /** + * Unified constructor for all versions. + * @param error Potential coordinator or group level error code (for api version 2 and later) + * @param responseData Fetched offset information grouped by topic-partition + * @param version The request API version + */ + public OffsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData, int version) { + super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version))); + + Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData); + List<Struct> topicArray = new ArrayList<>(); for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) { Struct topicData = this.struct.instance(RESPONSES_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entries.getKey()); - List<Struct> partitionArray = new ArrayList<Struct>(); + List<Struct> partitionArray = new ArrayList<>(); for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) { PartitionData fetchPartitionData = partitionEntry.getValue(); Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); @@ -103,66 +121,17 @@ public class OffsetFetchResponse extends AbstractResponse { topicArray.add(topicData); } - return topicArray; - } - - /** - * Unified constructor - * @param responseData Fetched offset information grouped by topic-partition - * @param topLevelErrorCode Potential coordinator or group level error code (for api version 2 and later) - * @param version The request API version - */ - public OffsetFetchResponse(Errors topLevelError, Map<TopicPartition, PartitionData> responseData, int version) { - super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version))); - - this.struct.set(RESPONSES_KEY_NAME, getTopicArray(responseData).toArray()); + this.struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); this.responseData = responseData; - this.error = topLevelError; + this.error = error; if (version > 1) this.struct.set(ERROR_CODE_KEY_NAME, this.error.code()); } - /** - * Unified constructor (used only if there are errors in the response) - * @param partitions partitions to be included in the response - * @param topLevelErrorCode The error code to be reported in the response - * @param version The request API version - */ - public OffsetFetchResponse(Errors topLevelError, List<TopicPartition> partitions, int version) { - super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version))); - - assert topLevelError != Errors.NONE; - this.responseData = new HashMap<>(); - this.error = topLevelError; - if (version < 2) { - for (TopicPartition partition : partitions) { - this.responseData.put(partition, new OffsetFetchResponse.PartitionData( - OffsetFetchResponse.INVALID_OFFSET, - OffsetFetchResponse.NO_METADATA, - topLevelError)); - } - } else - this.struct.set(ERROR_CODE_KEY_NAME, this.error.code()); - - this.struct.set(RESPONSES_KEY_NAME, getTopicArray(this.responseData).toArray()); - } - - public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) { - this(Errors.NONE, responseData, CURRENT_VERSION); - } - - /** - * Constructor for version 2 and above when there is a coordinator or group level error - * @param topLevelErrorCode Coordinator or group level error code - */ - public OffsetFetchResponse(Errors topLevelError) { - this(topLevelError, new ArrayList<TopicPartition>(), CURRENT_VERSION); - } - public OffsetFetchResponse(Struct struct) { super(struct); Errors topLevelError = Errors.NONE; - this.responseData = new HashMap<TopicPartition, PartitionData>(); + this.responseData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 2eeed55..d4913df 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1370,7 +1370,7 @@ public class KafkaConsumerTest { for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) { partitionData.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue(), "", error)); } - return new OffsetFetchResponse(partitionData); + return new OffsetFetchResponse(Errors.NONE, partitionData); } private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) { http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 9a8c0b9..ee6afe1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1482,12 +1482,12 @@ public class ConsumerCoordinatorTest { } private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) { - return new OffsetFetchResponse(topLevelError); + return new OffsetFetchResponse(topLevelError, Collections.<TopicPartition, OffsetFetchResponse.PartitionData>emptyMap()); } private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset) { OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, partitionLevelError); - return new OffsetFetchResponse(Collections.singletonMap(tp, data)); + return new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(tp, data)); } private OffsetCommitCallback callback(final AtomicBoolean success) { http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 0d3a1a8..a5ed806 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -90,8 +91,13 @@ public class RequestResponseTest { checkSerialization(createOffsetCommitResponse(), null); checkSerialization(OffsetFetchRequest.forAllPartitions("group1")); checkSerialization(OffsetFetchRequest.forAllPartitions("group1").getErrorResponse(new NotCoordinatorForGroupException()), 2); - checkSerialization(createOffsetFetchRequest()); - checkSerialization(createOffsetFetchRequest().getErrorResponse(new UnknownServerException()), null); + checkSerialization(createOffsetFetchRequest(0)); + checkSerialization(createOffsetFetchRequest(1)); + checkSerialization(createOffsetFetchRequest(2)); + checkSerialization(OffsetFetchRequest.forAllPartitions("group1")); + checkSerialization(createOffsetFetchRequest(0).getErrorResponse(new UnknownServerException()), 0); + checkSerialization(createOffsetFetchRequest(1).getErrorResponse(new UnknownServerException()), 1); + checkSerialization(createOffsetFetchRequest(2).getErrorResponse(new UnknownServerException()), 2); checkSerialization(createOffsetFetchResponse(), null); checkSerialization(createProduceRequest()); checkSerialization(createProduceRequest().getErrorResponse(new UnknownServerException()), null); @@ -337,7 +343,7 @@ public class RequestResponseTest { } private DescribeGroupsRequest createDescribeGroupRequest() { - return new DescribeGroupsRequest.Builder(Collections.singletonList("test-group")).build(); + return new DescribeGroupsRequest.Builder(singletonList("test-group")).build(); } private DescribeGroupsResponse createDescribeGroupResponse() { @@ -428,16 +434,17 @@ public class RequestResponseTest { return new OffsetCommitResponse(responseData); } - private OffsetFetchRequest createOffsetFetchRequest() { - return new OffsetFetchRequest.Builder("group1", - Arrays.asList(new TopicPartition("test11", 1))).build(); + private OffsetFetchRequest createOffsetFetchRequest(int version) { + return new OffsetFetchRequest.Builder("group1", singletonList(new TopicPartition("test11", 1))) + .setVersion((short) version) + .build(); } private OffsetFetchResponse createOffsetFetchResponse() { Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>(); responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE)); responseData.put(new TopicPartition("test", 1), new OffsetFetchResponse.PartitionData(100L, null, Errors.NONE)); - return new OffsetFetchResponse(responseData); + return new OffsetFetchResponse(Errors.NONE, responseData); } private ProduceRequest createProduceRequest() { @@ -544,7 +551,7 @@ public class RequestResponseTest { } private SaslHandshakeResponse createSaslHandshakeResponse() { - return new SaslHandshakeResponse(Errors.NONE.code(), Collections.singletonList("GSSAPI")); + return new SaslHandshakeResponse(Errors.NONE.code(), singletonList("GSSAPI")); } private ApiVersionsRequest createApiVersionRequest() { http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/main/scala/kafka/api/OffsetFetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index 2908901..dac4cc5 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -25,7 +25,6 @@ import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.utils.Logging import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.OffsetFetchResponse.PARTITION_ERRORS object OffsetFetchRequest extends Logging { val CurrentVersion: Short = 2 http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 4cbfad6..7abbc6e 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -422,7 +422,7 @@ class GroupCoordinator(val brokerId: Int, } } - def doCommitOffsets(group: GroupMetadata, + private def doCommitOffsets(group: GroupMetadata, memberId: String, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], @@ -455,7 +455,7 @@ class GroupCoordinator(val brokerId: Int, } def handleFetchOffsets(groupId: String, - partitions: Option[Seq[TopicPartition]]): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = { + partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = { if (!isActive.get) (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Map()) else if (!isCoordinatorForGroup(groupId)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 74b46ad..2d6889c 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -97,7 +97,7 @@ class GroupMetadataManager(val brokerId: Int, unit = TimeUnit.MILLISECONDS) } - def currentGroups(): Iterable[GroupMetadata] = groupMetadataCache.values + def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values def isPartitionOwned(partition: Int) = inLock(partitionLock) { ownedPartitions.contains(partition) } @@ -342,19 +342,24 @@ class GroupMetadataManager(val brokerId: Int, (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)) }.toMap } else { - if (topicPartitionsOpt.isEmpty) { - // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.) - group.allOffsets.map { case (topicPartition, offsetAndMetadata) => - (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)) - } - } else { - topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => - group.offset(topicPartition) match { - case None => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)) - case Some(offsetAndMetadata) => - (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)) + topicPartitionsOpt match { + case None => + // Return offsets for all partitions owned by this consumer group. (this only applies to consumers + // that commit offsets to Kafka.) + group.allOffsets.map { case (topicPartition, offsetAndMetadata) => + topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE) } - }.toMap + + case Some(topicPartitions) => + topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition => + val partitionData = group.offset(topicPartition) match { + case None => + new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE) + case Some(offsetAndMetadata) => + new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE) + } + topicPartition -> partitionData + }.toMap } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a6ad7b2..530dafc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -38,7 +38,7 @@ import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} import org.apache.kafka.common.record.{MemoryRecords, Record} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -886,38 +886,31 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) } - /* + /** * Handle an offset fetch request */ def handleOffsetFetchRequest(request: RequestChannel.Request) { val header = request.header val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest] + def authorizeTopicDescribe(partition: TopicPartition) = + authorize(request.session, Describe, new Resource(auth.Topic, partition.topic)) + val offsetFetchResponse = // reject the request if not authorized to the group if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) - new OffsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED, offsetFetchRequest.partitions, header.apiVersion) + offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED) else { - val partitions = - if (offsetFetchRequest.isAllPartitions) - List[TopicPartition]() - else - offsetFetchRequest.partitions.asScala.toList - - val (authorizedPartitions, unauthorizedPartitions) = - partitions.partition { partition => authorize(request.session, Describe, new Resource(auth.Topic, partition.topic)) } - - val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData( - OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.UNKNOWN_TOPIC_OR_PARTITION) - val unauthorizedStatus = unauthorizedPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap - if (header.apiVersion == 0) { + val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala + .partition(authorizeTopicDescribe) + // version 0 reads offsets from ZK - val responseInfo = authorizedPartitions.map { topicPartition => + val authorizedPartitionData = authorizedPartitions.map { topicPartition => val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic) try { if (!metadataCache.contains(topicPartition.topic)) - (topicPartition, unknownTopicPartitionResponse) + (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) else { val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1 payloadOpt match { @@ -925,7 +918,7 @@ class KafkaApis(val requestChannel: RequestChannel, (topicPartition, new OffsetFetchResponse.PartitionData( payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE)) case None => - (topicPartition, unknownTopicPartitionResponse) + (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) } } } catch { @@ -934,43 +927,32 @@ class KafkaApis(val requestChannel: RequestChannel, OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e))) } }.toMap - new OffsetFetchResponse(Errors.NONE, (responseInfo ++ unauthorizedStatus).asJava, header.apiVersion) - } - else { + + val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap + new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion) + } else { // versions 1 and above read offsets from Kafka - val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, - if (offsetFetchRequest.isAllPartitions) - None - else + if (offsetFetchRequest.isAllPartitions) { + val (error, allPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId) + if (error != Errors.NONE) + offsetFetchRequest.getErrorResponse(error) + else { + // clients are not allowed to see offsets for topics that are not authorized for Describe + val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) } + new OffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava, header.apiVersion) + } + } else { + val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala + .partition(authorizeTopicDescribe) + val (error, authorizedPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, Some(authorizedPartitions)) - - // Note that we do not need to filter the partitions in the - // metadata cache as the topic partitions will be filtered - // in coordinator's offset manager through the offset cache - if (header.apiVersion == 1) { - val authorizedStatus = - if (offsets._1 != Errors.NONE) { - authorizedPartitions.map { partition => - (partition, new OffsetFetchResponse.PartitionData( - OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, offsets._1))}.toMap - } - else - offsets._2.toMap - new OffsetFetchResponse(Errors.NONE, (authorizedStatus ++ unauthorizedStatus).asJava, header.apiVersion) - } - else if (offsets._1 == Errors.NONE) { - if (offsetFetchRequest.isAllPartitions) { - // filter out unauthorized topics in case all group offsets are requested - val authorizedStatus = offsets._2.filter { - case (partition, _) => authorize(request.session, Describe, new Resource(auth.Topic, partition.topic)) - } - new OffsetFetchResponse((authorizedStatus).asJava) + if (error != Errors.NONE) + offsetFetchRequest.getErrorResponse(error) + else { + val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap + new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion) } - else - new OffsetFetchResponse((offsets._2.toMap ++ unauthorizedStatus).asJava) } - else - new OffsetFetchResponse(offsets._1) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 42251fa..d43d1af 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -40,6 +40,7 @@ import scala.collection.mutable import scala.collection.mutable.Buffer import org.apache.kafka.common.KafkaException import kafka.admin.AdminUtils +import kafka.network.SocketServer import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.MemoryRecords @@ -173,7 +174,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { producers.foreach(_.close()) consumers.foreach(_.wakeup()) consumers.foreach(_.close()) - removeAllAcls + removeAllAcls() super.tearDown() } @@ -712,6 +713,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test + def testFetchAllOffsetsTopicAuthorization() { + val offset = 15L + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(offset)).asJava) + + removeAllAcls() + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + + // send offset fetch requests directly since the consumer does not expose an API to do so + // note there's only one broker, so no need to lookup the group coordinator + + // without describe permission on the topic, we shouldn't be able to fetch offsets + val offsetFetchRequest = requests.OffsetFetchRequest.forAllPartitions(group) + var offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, anySocketServer) + assertEquals(Errors.NONE, offsetFetchResponse.error) + assertTrue(offsetFetchResponse.responseData.isEmpty) + + // now add describe permission on the topic and verify that the offset can be fetched + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + offsetFetchResponse = sendOffsetFetchRequest(offsetFetchRequest, anySocketServer) + assertEquals(Errors.NONE, offsetFetchResponse.error) + assertTrue(offsetFetchResponse.responseData.containsKey(tp)) + assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset) + } + + @Test def testOffsetFetchTopicDescribe() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) @@ -846,4 +875,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + private def sendOffsetFetchRequest(request: requests.OffsetFetchRequest, + socketServer: SocketServer): requests.OffsetFetchResponse = { + val response = send(request, ApiKeys.OFFSET_FETCH, socketServer) + requests.OffsetFetchResponse.parse(response, request.version) + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/7a84b241/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index 20e512f..d3de16d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -25,7 +25,7 @@ import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager} import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest} +import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.{After, Before, Test} @@ -749,6 +749,62 @@ class GroupCoordinatorResponseTest extends JUnitSuite { } @Test + def testFetchOffsets() { + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, + OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE.code, commitOffsetResult(tp)) + + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, error) + assertEquals(Some(0), partitionData.get(tp).map(_.offset)) + } + + @Test + def testFetchOffsetForUnknownPartition(): Unit = { + val tp = new TopicPartition("topic", 0) + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, error) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) + } + + @Test + def testFetchOffsetNotCoordinatorForGroup(): Unit = { + val tp = new TopicPartition("topic", 0) + val (error, partitionData) = groupCoordinator.handleFetchOffsets(otherGroupId, Some(Seq(tp))) + assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error) + assertTrue(partitionData.isEmpty) + } + + @Test + def testFetchAllOffsets() { + val tp1 = new TopicPartition("topic", 0) + val tp2 = new TopicPartition("topic", 1) + val tp3 = new TopicPartition("other-topic", 0) + val offset1 = OffsetAndMetadata(15) + val offset2 = OffsetAndMetadata(16) + val offset3 = OffsetAndMetadata(17) + + assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId)) + + val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, + OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3)) + assertEquals(Errors.NONE.code, commitOffsetResult(tp1)) + assertEquals(Errors.NONE.code, commitOffsetResult(tp2)) + assertEquals(Errors.NONE.code, commitOffsetResult(tp3)) + + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId) + assertEquals(Errors.NONE, error) + assertEquals(3, partitionData.size) + assertTrue(partitionData.forall(_._2.error == Errors.NONE)) + assertEquals(Some(offset1.offset), partitionData.get(tp1).map(_.offset)) + assertEquals(Some(offset2.offset), partitionData.get(tp2).map(_.offset)) + assertEquals(Some(offset3.offset), partitionData.get(tp3).map(_.offset)) + } + + @Test def testCommitOffsetInAwaitingSync() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID val tp = new TopicPartition("topic", 0)