Repository: kafka Updated Branches: refs/heads/trunk fb86cf633 -> aa365639b
kafka-2115; Error updating metrics in RequestChannel; patched by Gwen Shapira; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa365639 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa365639 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa365639 Branch: refs/heads/trunk Commit: aa365639b2dd36d8feff761f09208154ce71964d Parents: fb86cf6 Author: Gwen Shapira <[email protected]> Authored: Mon Apr 13 09:21:53 2015 -0500 Committer: Jun Rao <[email protected]> Committed: Mon Apr 13 09:21:53 2015 -0500 ---------------------------------------------------------------------- .../apache/kafka/common/protocol/ApiKeys.java | 24 +++++++++++--------- .../apache/kafka/common/protocol/Protocol.java | 4 ++++ .../scala/kafka/network/RequestChannel.scala | 6 ++--- 3 files changed, 20 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/aa365639/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 07aba71..b39e9bb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -20,17 +20,19 @@ package org.apache.kafka.common.protocol; * Identifiers for all the Kafka APIs */ public enum ApiKeys { - PRODUCE(0, "produce"), - FETCH(1, "fetch"), - LIST_OFFSETS(2, "list_offsets"), - METADATA(3, "metadata"), - LEADER_AND_ISR(4, "leader_and_isr"), - STOP_REPLICA(5, "stop_replica"), - OFFSET_COMMIT(8, "offset_commit"), - OFFSET_FETCH(9, "offset_fetch"), - CONSUMER_METADATA(10, "consumer_metadata"), - JOIN_GROUP(11, "join_group"), - HEARTBEAT(12, "heartbeat"); + PRODUCE(0, "Produce"), + FETCH(1, "Fetch"), + LIST_OFFSETS(2, "Offsets"), + METADATA(3, "Metadata"), + LEADER_AND_ISR(4, "LeaderAndIsr"), + STOP_REPLICA(5, "StopReplica"), + UPDATE_METADATA_KEY(6, "UpdateMetadata"), + CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"), + OFFSET_COMMIT(8, "OffsetCommit"), + OFFSET_FETCH(9, "OffsetFetch"), + CONSUMER_METADATA(10, "ConsumerMetadata"), + JOIN_GROUP(11, "JoinGroup"), + HEARTBEAT(12, "Heartbeat"); private static ApiKeys[] codeToType; public static final int MAX_API_KEY; http://git-wip-us.apache.org/repos/asf/kafka/blob/aa365639/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 9c4518e..d53fe45 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -404,6 +404,8 @@ public class Protocol { REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {}; + REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST; REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST; REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; @@ -416,6 +418,8 @@ public class Protocol { RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE; RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {}; + RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE; RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE; RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; http://git-wip-us.apache.org/repos/asf/kafka/blob/aa365639/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1d9c57b..1d0024c 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,7 +26,7 @@ import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet import java.net._ -import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger @@ -82,7 +82,7 @@ object RequestChannel extends Logging { val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L) val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L) val totalTime = endTimeMs - startTimeMs - var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId))) + var metricsList = List(RequestMetrics.metricsMap(ApiKeys.forId(requestId).name)) if (requestId == RequestKeys.FetchKey) { val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower metricsList ::= ( if (isFromFollower) @@ -207,7 +207,7 @@ object RequestMetrics { val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics] val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer" val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower" - (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) + (ApiKeys.values().toList.map(e => e.name) ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) }
