Repository: kafka Updated Branches: refs/heads/1.0 e409f847f -> e671c1cd3
MINOR: Avoid some unnecessary collection copies in KafkaApis Author: Jason Gustafson <ja...@confluent.io> Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #4035 from hachikuji/KAFKA-5547-followup and squashes the following commits: f6b04ce1a [Jason Gustafson] Add a couple missed common fields d3473b14d [Jason Gustafson] Fix compilation errors and a few warnings 58a0ae695 [Jason Gustafson] MINOR: Avoid some unnecessary collection copies in KafkaApis (cherry picked from commit 1027ff3c769906b7b80582217cf5b4703fd6864d) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e671c1cd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e671c1cd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e671c1cd Branch: refs/heads/1.0 Commit: e671c1cd36caf3b767fb9cc4af58c25918292afe Parents: e409f84 Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Oct 10 15:01:10 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Oct 10 15:03:56 2017 -0700 ---------------------------------------------------------------------- .../kafka/common/protocol/CommonFields.java | 2 + .../common/requests/FindCoordinatorRequest.java | 13 +- .../common/requests/InitProducerIdRequest.java | 9 +- .../kafka/common/requests/ProduceRequest.java | 13 +- .../kafka/admin/ReassignPartitionsCommand.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 180 ++++++++----------- core/src/main/scala/kafka/utils/ZkUtils.scala | 3 +- .../main/scala/kafka/utils/json/JsonValue.scala | 2 - 8 files changed, 96 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e671c1cd/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java index 472a791..e1d1884 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java @@ -35,6 +35,8 @@ public class CommonFields { // Transactional APIs public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction."); + public static final Field.NullableStr NULLABLE_TRANSACTIONAL_ID = new Field.NullableStr("transactional_id", + "The transactional id or null if the producer is not transactional"); public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id."); public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id."); http://git-wip-us.apache.org/repos/asf/kafka/blob/e671c1cd/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java index c94bcde..9bfc968 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java @@ -26,16 +26,15 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; import static org.apache.kafka.common.protocol.types.Type.INT8; import static org.apache.kafka.common.protocol.types.Type.STRING; public class FindCoordinatorRequest extends AbstractRequest { - private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key"; private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type"; - private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema( - new Field("group_id", STRING, "The unique group id.")); + private static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(GROUP_ID); private static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema( new Field("coordinator_key", STRING, "Id to use for finding the coordinator (for groups, this is the groupId, " + @@ -102,8 +101,8 @@ public class FindCoordinatorRequest extends AbstractRequest { this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME)); else this.coordinatorType = CoordinatorType.GROUP; - if (struct.hasField(GROUP_ID_KEY_NAME)) - this.coordinatorKey = struct.getString(GROUP_ID_KEY_NAME); + if (struct.hasField(GROUP_ID)) + this.coordinatorKey = struct.get(GROUP_ID); else this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME); } @@ -138,8 +137,8 @@ public class FindCoordinatorRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version())); - if (struct.hasField(GROUP_ID_KEY_NAME)) - struct.set(GROUP_ID_KEY_NAME, coordinatorKey); + if (struct.hasField(GROUP_ID)) + struct.set(GROUP_ID, coordinatorKey); else struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey); if (struct.hasField(COORDINATOR_TYPE_KEY_NAME)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e671c1cd/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java index fa14a97..6c659ff 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdRequest.java @@ -24,17 +24,16 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID; import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; public class InitProducerIdRequest extends AbstractRequest { public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE; - private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms"; private static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema( - new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional id whose producer id we want to retrieve or generate."), + NULLABLE_TRANSACTIONAL_ID, new Field(TRANSACTION_TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for before aborting idle transactions sent by this producer.")); public static Schema[] schemaVersions() { @@ -79,7 +78,7 @@ public class InitProducerIdRequest extends AbstractRequest { public InitProducerIdRequest(Struct struct, short version) { super(version); - this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); + this.transactionalId = struct.get(NULLABLE_TRANSACTIONAL_ID); this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME); } @@ -109,7 +108,7 @@ public class InitProducerIdRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version())); - struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); + struct.set(NULLABLE_TRANSACTIONAL_ID, transactionalId); struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e671c1cd/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index ee4a2e2..91e3aeb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.CommonFields; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; @@ -39,15 +40,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.NULLABLE_TRANSACTIONAL_ID; import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; import static org.apache.kafka.common.protocol.types.Type.INT16; import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; import static org.apache.kafka.common.protocol.types.Type.RECORDS; public class ProduceRequest extends AbstractRequest { - private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; private static final String ACKS_KEY_NAME = "acks"; private static final String TIMEOUT_KEY_NAME = "timeout"; private static final String TOPIC_DATA_KEY_NAME = "topic_data"; @@ -87,8 +87,7 @@ public class ProduceRequest extends AbstractRequest { // Produce request V3 adds the transactional id which is used for authorization when attempting to write // transactional data. This version also adds support for message format V2. private static final Schema PRODUCE_REQUEST_V3 = new Schema( - new Field(TRANSACTIONAL_ID_KEY_NAME, NULLABLE_STRING, "The transactional ID of the producer. This is used to " + - "authorize transaction produce requests. This can be null for non-transactional producers."), + CommonFields.NULLABLE_TRANSACTIONAL_ID, new Field(ACKS_KEY_NAME, INT16, "The number of acknowledgments the producer requires the leader to have " + "received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 " + "for only the leader and -1 for the full ISR."), @@ -229,7 +228,7 @@ public class ProduceRequest extends AbstractRequest { partitionSizes = createPartitionSizes(partitionRecords); acks = struct.getShort(ACKS_KEY_NAME); timeout = struct.getInt(TIMEOUT_KEY_NAME); - transactionalId = struct.hasField(TRANSACTIONAL_ID_KEY_NAME) ? struct.getString(TRANSACTIONAL_ID_KEY_NAME) : null; + transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null); } private void validateRecords(short version, MemoryRecords records) { @@ -268,9 +267,7 @@ public class ProduceRequest extends AbstractRequest { Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords); struct.set(ACKS_KEY_NAME, acks); struct.set(TIMEOUT_KEY_NAME, timeout); - - if (struct.hasField(TRANSACTIONAL_ID_KEY_NAME)) - struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); + struct.setIfExists(NULLABLE_TRANSACTIONAL_ID, transactionalId); List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size()); for (Map.Entry<String, Map<Integer, MemoryRecords>> topicEntry : recordsByTopic.entrySet()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e671c1cd/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index af81697..33884d6 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.TopicPartitionReplica import org.apache.kafka.common.errors.{LogDirNotFoundException, ReplicaNotAvailableException} -import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, DescribeReplicaLogDirsResult, AdminClient => JAdminClient} +import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient} import LogConfig._ import joptsimple.OptionParser import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo http://git-wip-us.apache.org/repos/asf/kafka/blob/e671c1cd/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8eceaa0..0066702 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -265,25 +265,24 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(request, requestThrottleMs => new OffsetCommitResponse(requestThrottleMs, results.asJava)) } else { - var unauthorizedTopics = Set[TopicPartition]() - var nonExistingTopics = Set[TopicPartition]() - var authorizedTopics = mutable.Map[TopicPartition, OffsetCommitRequest.PartitionData]() + val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() + val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() + val authorizedTopicRequestInfoBldr = immutable.Map.newBuilder[TopicPartition, OffsetCommitRequest.PartitionData] - for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala.toMap) { + for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) { if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) - unauthorizedTopics += topicPartition + unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition.topic)) - nonExistingTopics += topicPartition + nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION) else - authorizedTopics += (topicPartition -> partitionData) + authorizedTopicRequestInfoBldr += (topicPartition -> partitionData) } + val authorizedTopicRequestInfo = authorizedTopicRequestInfoBldr.result() + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) { - val combinedCommitStatus = commitStatus ++ - unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ - nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) - + val combinedCommitStatus = commitStatus ++ unauthorizedTopicErrors ++ nonExistingTopicErrors if (isDebugEnabled) combinedCommitStatus.foreach { case (topicPartition, error) => if (error != Errors.NONE) { @@ -295,11 +294,11 @@ class KafkaApis(val requestChannel: RequestChannel, new OffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) } - if (authorizedTopics.isEmpty) + if (authorizedTopicRequestInfo.isEmpty) sendResponseCallback(Map.empty) else if (header.apiVersion == 0) { // for version 0 always store offsets to ZK - val responseInfo = authorizedTopics.map { + val responseInfo = authorizedTopicRequestInfo.map { case (topicPartition, partitionData) => val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic) try { @@ -313,7 +312,7 @@ class KafkaApis(val requestChannel: RequestChannel, case e: Throwable => (topicPartition, Errors.forException(e)) } } - sendResponseCallback(responseInfo.toMap) + sendResponseCallback(responseInfo) } else { // for version 1 and beyond store offsets in offset manager @@ -334,7 +333,7 @@ class KafkaApis(val requestChannel: RequestChannel, // - If v2 we use the default expiration timestamp val currentTimestamp = time.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - val partitionData = authorizedTopics.mapValues { partitionData => + val partitionData = authorizedTopicRequestInfo.mapValues { partitionData => val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata new OffsetAndMetadata( offsetMetadata = OffsetMetadata(partitionData.offset, metadata), @@ -353,7 +352,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.groupId, offsetCommitRequest.memberId, offsetCommitRequest.generationId, - partitionData.toMap, + partitionData, sendResponseCallback) } } @@ -381,15 +380,15 @@ class KafkaApis(val requestChannel: RequestChannel, return } - var unauthorizedTopics = Set[TopicPartition]() - var nonExistingTopics = Set[TopicPartition]() - var authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() + val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() + val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() + val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) { if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) - unauthorizedTopics += topicPartition + unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition.topic)) - nonExistingTopics += topicPartition + nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) else authorizedRequestInfo += (topicPartition -> memoryRecords) } @@ -397,10 +396,7 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { - val mergedResponseStatus = responseStatus ++ - unauthorizedTopics.map(_ -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++ - nonExistingTopics.map(_ -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)) - + val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses var errorInResponse = false mergedResponseStatus.foreach { case (topicPartition, status) => @@ -483,29 +479,23 @@ class KafkaApis(val requestChannel: RequestChannel, val versionId = request.header.apiVersion val clientId = request.header.clientId - var unauthorizedTopics = Set[TopicPartition]() - var nonExistingTopics = Set[TopicPartition]() - var authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() + val unauthorizedTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]() + val nonExistingTopicResponseData = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData)]() + val authorizedRequestInfo = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]() for ((topicPartition, partitionData) <- fetchRequest.fetchData.asScala) { if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) - unauthorizedTopics += topicPartition + unauthorizedTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, + FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, + FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) else if (!metadataCache.contains(topicPartition.topic)) - nonExistingTopics += topicPartition + nonExistingTopicResponseData += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, + FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, + FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) else authorizedRequestInfo += (topicPartition -> partitionData) } - val nonExistingPartitionData = nonExistingTopics.map { - case tp => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)) - } - - val unauthorizedForReadPartitionData = unauthorizedTopics.map { - case tp => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, - FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)) - } - def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = { // Down-conversion of the fetched records is needed when the stored magic version is @@ -547,8 +537,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val mergedPartitionData = partitionData ++ unauthorizedForReadPartitionData ++ nonExistingPartitionData - + val mergedPartitionData = partitionData ++ unauthorizedTopicResponseData ++ nonExistingTopicResponseData val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]() mergedPartitionData.foreach { case (topicPartition, data) => @@ -1393,23 +1382,22 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDeleteTopicsRequest(request: RequestChannel.Request) { val deleteTopicRequest = request.body[DeleteTopicsRequest] - var unauthorizedTopics = Set[String]() - var nonExistingTopics = Set[String]() - var authorizedForDeleteTopics = Set[String]() + val unauthorizedTopicErrors = mutable.Map[String, Errors]() + val nonExistingTopicErrors = mutable.Map[String, Errors]() + val authorizedForDeleteTopics = mutable.Set[String]() for (topic <- deleteTopicRequest.topics.asScala) { if (!authorize(request.session, Delete, new Resource(Topic, topic))) - unauthorizedTopics += topic + unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED else if (!metadataCache.contains(topic)) - nonExistingTopics += topic + nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION else - authorizedForDeleteTopics += topic + authorizedForDeleteTopics.add(topic) } - def sendResponseCallback(results: Map[String, Errors]): Unit = { + def sendResponseCallback(authorizedTopicErrors: Map[String, Errors]): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { - val completeResults = unauthorizedTopics.map(topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ - nonExistingTopics.map(topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ results + val completeResults = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ authorizedTopicErrors val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava) trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") responseBody @@ -1439,28 +1427,24 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDeleteRecordsRequest(request: RequestChannel.Request) { val deleteRecordsRequest = request.body[DeleteRecordsRequest] - var unauthorizedTopics = Set[TopicPartition]() - var nonExistingTopics = Set[TopicPartition]() - var authorizedForDeleteTopics = mutable.Map[TopicPartition, Long]() + val unauthorizedTopicResponses = mutable.Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]() + val nonExistingTopicResponses = mutable.Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]() + val authorizedForDeleteTopicOffsets = mutable.Map[TopicPartition, Long]() for ((topicPartition, offset) <- deleteRecordsRequest.partitionOffsets.asScala) { if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic))) - unauthorizedTopics += topicPartition + unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse( + DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition.topic)) - nonExistingTopics += topicPartition + nonExistingTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse( + DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION) else - authorizedForDeleteTopics += (topicPartition -> offset) + authorizedForDeleteTopicOffsets += (topicPartition -> offset) } // the callback for sending a DeleteRecordsResponse - def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) { - - val mergedResponseStatus = responseStatus ++ - unauthorizedTopics.map(_ -> - new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++ - nonExistingTopics.map(_ -> - new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION)) - + def sendResponseCallback(authorizedTopicResponses: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) { + val mergedResponseStatus = authorizedTopicResponses ++ unauthorizedTopicResponses ++ nonExistingTopicResponses mergedResponseStatus.foreach { case (topicPartition, status) => if (status.error != Errors.NONE) { debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format( @@ -1475,13 +1459,13 @@ class KafkaApis(val requestChannel: RequestChannel, new DeleteRecordsResponse(requestThrottleMs, mergedResponseStatus.asJava)) } - if (authorizedForDeleteTopics.isEmpty) + if (authorizedForDeleteTopicOffsets.isEmpty) sendResponseCallback(Map.empty) else { // call the replica manager to append messages to the replicas replicaManager.deleteRecords( deleteRecordsRequest.timeout.toLong, - authorizedForDeleteTopics.mapValues(_.toLong), + authorizedForDeleteTopicOffsets, sendResponseCallback) } } @@ -1650,45 +1634,38 @@ class KafkaApis(val requestChannel: RequestChannel, ensureInterBrokerVersion(KAFKA_0_11_0_IV0) val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] val transactionalId = addPartitionsToTxnRequest.transactionalId - val partitionsToAdd = addPartitionsToTxnRequest.partitions + val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) sendResponseMaybeThrottle(request, requestThrottleMs => addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) else { - val internalTopics = partitionsToAdd.asScala.filter {tp => org.apache.kafka.common.internals.Topic.isInternal(tp.topic())} - - var unauthorizedTopics = Set[TopicPartition]() - var nonExistingTopics = Set[TopicPartition]() - var authorizedPartitions = Set[TopicPartition]() - - for ( topicPartition <- partitionsToAdd.asScala) { - if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) - unauthorizedTopics += topicPartition + val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() + val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() + val authorizedPartitions = mutable.Set[TopicPartition]() + + for (topicPartition <- partitionsToAdd) { + if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) || + !authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) + unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED else if (!metadataCache.contains(topicPartition.topic)) - nonExistingTopics += topicPartition + nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION else - authorizedPartitions += topicPartition + authorizedPartitions.add(topicPartition) } - if (unauthorizedTopics.nonEmpty - || nonExistingTopics.nonEmpty - || internalTopics.nonEmpty) { - + if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) { // Any failed partition check causes the entire request to fail. We send the appropriate error codes for the // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded // the authorization check to indicate that they were not added to the transaction. - val partitionErrors = (unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ - nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) ++ - internalTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ - authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)).toMap - + val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ + authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED) sendResponseMaybeThrottle(request, requestThrottleMs => new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) } else { def sendResponseCallback(error: Errors): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs, - partitionsToAdd.asScala.map{tp => (tp, error)}.toMap.asJava) + partitionsToAdd.map{tp => (tp, error)}.toMap.asJava) trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}") responseBody } @@ -1699,7 +1676,7 @@ class KafkaApis(val requestChannel: RequestChannel, txnCoordinator.handleAddPartitionsToTransaction(transactionalId, addPartitionsToTxnRequest.producerId, addPartitionsToTxnRequest.producerEpoch, - partitionsToAdd.asScala.toSet, + authorizedPartitions, sendResponseCallback) } } @@ -1749,25 +1726,22 @@ class KafkaApis(val requestChannel: RequestChannel, else if (!authorize(request.session, Read, new Resource(Group, txnOffsetCommitRequest.consumerGroupId))) sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) else { - var unauthorizedTopics = Set[TopicPartition]() - var nonExistingTopics = Set[TopicPartition]() - var authorizedTopics = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() + val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() + val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() + val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) { if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) - unauthorizedTopics += topicPartition + unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED else if (!metadataCache.contains(topicPartition.topic)) - nonExistingTopics += topicPartition + nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION else - authorizedTopics += (topicPartition -> commitedOffset) + authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset) } // the callback for sending an offset commit response - def sendResponseCallback(commitStatus: Map[TopicPartition, Errors]) { - val combinedCommitStatus = commitStatus ++ - unauthorizedTopics.map(_ -> Errors.TOPIC_AUTHORIZATION_FAILED) ++ - nonExistingTopics.map(_ -> Errors.UNKNOWN_TOPIC_OR_PARTITION) - + def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]) { + val combinedCommitStatus = authorizedTopicErrors ++ unauthorizedTopicErrors ++ nonExistingTopicErrors if (isDebugEnabled) combinedCommitStatus.foreach { case (topicPartition, error) => if (error != Errors.NONE) { @@ -1779,10 +1753,10 @@ class KafkaApis(val requestChannel: RequestChannel, new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) } - if (authorizedTopics.isEmpty) + if (authorizedTopicCommittedOffsets.isEmpty) sendResponseCallback(Map.empty) else { - val offsetMetadata = convertTxnOffsets(authorizedTopics.toMap) + val offsetMetadata = convertTxnOffsets(authorizedTopicCommittedOffsets.toMap) groupCoordinator.handleTxnCommitOffsets( txnOffsetCommitRequest.consumerGroupId, txnOffsetCommitRequest.producerId, http://git-wip-us.apache.org/repos/asf/kafka/blob/e671c1cd/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 755f500..0d97ce9 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -39,7 +39,6 @@ import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.{ACL, Stat} import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper} -import kafka.utils.Json._ import scala.collection._ import scala.collection.JavaConverters._ @@ -494,7 +493,7 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper, /** * Create an ephemeral node with the given path and data. Create parents if necessary. */ - private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL] = UseDefaultAcls): Unit = { + private def createEphemeralPath(path: String, data: String, acls: java.util.List[ACL]): Unit = { val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls try { zkPath.createEphemeral(path, data, acl) http://git-wip-us.apache.org/repos/asf/kafka/blob/e671c1cd/core/src/main/scala/kafka/utils/json/JsonValue.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/json/JsonValue.scala b/core/src/main/scala/kafka/utils/json/JsonValue.scala index 2be1880..cbc82c0 100644 --- a/core/src/main/scala/kafka/utils/json/JsonValue.scala +++ b/core/src/main/scala/kafka/utils/json/JsonValue.scala @@ -17,8 +17,6 @@ package kafka.utils.json -import scala.collection._ - import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode} import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode}