http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 36b0c86..891896a 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -46,7 +46,7 @@ class GroupCoordinator(val brokerId: Int, val joinPurgatory: DelayedOperationPurgatory[DelayedJoin], time: Time) extends Logging { type JoinCallback = JoinGroupResult => Unit - type SyncCallback = (Array[Byte], Short) => Unit + type SyncCallback = (Array[Byte], Errors) => Unit this.logIdent = "[GroupCoordinator " + brokerId + "]: " @@ -99,16 +99,16 @@ class GroupCoordinator(val brokerId: Int, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback) { if (!isActive.get) { - responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)) + responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)) } else if (!validGroupId(groupId)) { - responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code)) + responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID)) } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code)) + responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP)) } else if (isCoordinatorLoadingInProgress(groupId)) { - responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code)) + responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS)) } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs || sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) { - responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code)) + responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT)) } else { // only try to create the group if the group is not unknown AND // the member id is UNKNOWN, if member is specified but group does not @@ -116,7 +116,7 @@ class GroupCoordinator(val brokerId: Int, groupManager.getGroup(groupId) match { case None => if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { - responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) + responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID)) } else { val group = groupManager.addGroup(new GroupMetadata(groupId)) doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) @@ -140,11 +140,11 @@ class GroupCoordinator(val brokerId: Int, group synchronized { if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) { // if the new member does not support the group protocol, reject it - responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code)) + responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL)) } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { // if the member trying to register with a un-recognized id, send the response to let // it reset its member id and retry - responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) + responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID)) } else { group.currentState match { case Dead => @@ -152,7 +152,7 @@ class GroupCoordinator(val brokerId: Int, // from the coordinator metadata; this is likely that the group has migrated to some other // coordinator OR the group is in a transient unstable phase. Let the member retry // joining without the specified member id, - responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) + responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID)) case PreparingRebalance => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { @@ -181,7 +181,7 @@ class GroupCoordinator(val brokerId: Int, generationId = group.generationId, subProtocol = group.protocol, leaderId = group.leaderId, - errorCode = Errors.NONE.code)) + error = Errors.NONE)) } else { // member has changed metadata, so force a rebalance updateMemberAndRebalance(group, member, protocols, responseCallback) @@ -208,7 +208,7 @@ class GroupCoordinator(val brokerId: Int, generationId = group.generationId, subProtocol = group.protocol, leaderId = group.leaderId, - errorCode = Errors.NONE.code)) + error = Errors.NONE)) } } } @@ -225,12 +225,12 @@ class GroupCoordinator(val brokerId: Int, groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback) { if (!isActive.get) { - responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) + responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE) } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code) + responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP) } else { groupManager.getGroup(groupId) match { - case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) + case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback) } } @@ -245,16 +245,16 @@ class GroupCoordinator(val brokerId: Int, group synchronized { if (!group.has(memberId)) { - responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) } else if (generationId != group.generationId) { - responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code) + responseCallback(Array.empty, Errors.ILLEGAL_GENERATION) } else { group.currentState match { case Empty | Dead => - responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID) case PreparingRebalance => - responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code) + responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS) case AwaitingSync => group.get(memberId).awaitingSyncCallback = responseCallback @@ -288,7 +288,7 @@ class GroupCoordinator(val brokerId: Int, case Stable => // if the group is stable, we just return the current assignment val memberMetadata = group.get(memberId) - responseCallback(memberMetadata.assignment, Errors.NONE.code) + responseCallback(memberMetadata.assignment, Errors.NONE) completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) } } @@ -300,13 +300,13 @@ class GroupCoordinator(val brokerId: Int, delayedGroupStore.foreach(groupManager.store) } - def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Short => Unit) { + def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit) { if (!isActive.get) { - responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) + responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE) } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) + responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP) } else if (isCoordinatorLoadingInProgress(groupId)) { - responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code) + responseCallback(Errors.GROUP_LOAD_IN_PROGRESS) } else { groupManager.getGroup(groupId) match { case None => @@ -314,17 +314,17 @@ class GroupCoordinator(val brokerId: Int, // from the coordinator metadata; this is likely that the group has migrated to some other // coordinator OR the group is in a transient unstable phase. Let the consumer to retry // joining without specified consumer id, - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Errors.UNKNOWN_MEMBER_ID) case Some(group) => group synchronized { if (group.is(Dead) || !group.has(memberId)) { - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Errors.UNKNOWN_MEMBER_ID) } else { val member = group.get(memberId) removeHeartbeatForLeavingMember(group, member) onMemberFailure(group, member) - responseCallback(Errors.NONE.code) + responseCallback(Errors.NONE) } } } @@ -334,18 +334,18 @@ class GroupCoordinator(val brokerId: Int, def handleHeartbeat(groupId: String, memberId: String, generationId: Int, - responseCallback: Short => Unit) { + responseCallback: Errors => Unit) { if (!isActive.get) { - responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) + responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE) } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) + responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP) } else if (isCoordinatorLoadingInProgress(groupId)) { // the group is still loading, so respond just blindly - responseCallback(Errors.NONE.code) + responseCallback(Errors.NONE) } else { groupManager.getGroup(groupId) match { case None => - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Errors.UNKNOWN_MEMBER_ID) case Some(group) => group synchronized { @@ -355,37 +355,37 @@ class GroupCoordinator(val brokerId: Int, // from the coordinator metadata; this is likely that the group has migrated to some other // coordinator OR the group is in a transient unstable phase. Let the member retry // joining without the specified member id, - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Errors.UNKNOWN_MEMBER_ID) case Empty => - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Errors.UNKNOWN_MEMBER_ID) case AwaitingSync => if (!group.has(memberId)) - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Errors.UNKNOWN_MEMBER_ID) else - responseCallback(Errors.REBALANCE_IN_PROGRESS.code) + responseCallback(Errors.REBALANCE_IN_PROGRESS) case PreparingRebalance => if (!group.has(memberId)) { - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Errors.UNKNOWN_MEMBER_ID) } else if (generationId != group.generationId) { - responseCallback(Errors.ILLEGAL_GENERATION.code) + responseCallback(Errors.ILLEGAL_GENERATION) } else { val member = group.get(memberId) completeAndScheduleNextHeartbeatExpiration(group, member) - responseCallback(Errors.REBALANCE_IN_PROGRESS.code) + responseCallback(Errors.REBALANCE_IN_PROGRESS) } case Stable => if (!group.has(memberId)) { - responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + responseCallback(Errors.UNKNOWN_MEMBER_ID) } else if (generationId != group.generationId) { - responseCallback(Errors.ILLEGAL_GENERATION.code) + responseCallback(Errors.ILLEGAL_GENERATION) } else { val member = group.get(memberId) completeAndScheduleNextHeartbeatExpiration(group, member) - responseCallback(Errors.NONE.code) + responseCallback(Errors.NONE) } } } @@ -397,13 +397,13 @@ class GroupCoordinator(val brokerId: Int, memberId: String, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], - responseCallback: immutable.Map[TopicPartition, Short] => Unit) { + responseCallback: immutable.Map[TopicPartition, Errors] => Unit) { if (!isActive.get) { - responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)) + responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE)) } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code)) + responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP)) } else if (isCoordinatorLoadingInProgress(groupId)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code)) + responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS)) } else { groupManager.getGroup(groupId) match { case None => @@ -413,7 +413,7 @@ class GroupCoordinator(val brokerId: Int, doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback) } else { // or this is a request coming from an older generation. either way, reject the commit - responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION)) } case Some(group) => @@ -426,22 +426,22 @@ class GroupCoordinator(val brokerId: Int, memberId: String, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], - responseCallback: immutable.Map[TopicPartition, Short] => Unit) { + responseCallback: immutable.Map[TopicPartition, Errors] => Unit) { var delayedOffsetStore: Option[DelayedStore] = None group synchronized { if (group.is(Dead)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code)) + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) } else if (generationId < 0 && group.is(Empty)) { // the group is only using Kafka to store offsets delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId, offsetMetadata, responseCallback) } else if (group.is(AwaitingSync)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code)) + responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS)) } else if (!group.has(memberId)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code)) + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) } else if (generationId != group.generationId) { - responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION)) } else { val member = group.get(memberId) completeAndScheduleNextHeartbeatExpiration(group, member) @@ -512,7 +512,7 @@ class GroupCoordinator(val brokerId: Int, case PreparingRebalance => for (member <- group.allMemberMetadata) { if (member.awaitingJoinCallback != null) { - member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code)) + member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP)) member.awaitingJoinCallback = null } } @@ -521,7 +521,7 @@ class GroupCoordinator(val brokerId: Int, case Stable | AwaitingSync => for (member <- group.allMemberMetadata) { if (member.awaitingSyncCallback != null) { - member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code) + member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP) member.awaitingSyncCallback = null } heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId)) @@ -561,7 +561,7 @@ class GroupCoordinator(val brokerId: Int, private def propagateAssignment(group: GroupMetadata, error: Errors) { for (member <- group.allMemberMetadata) { if (member.awaitingSyncCallback != null) { - member.awaitingSyncCallback(member.assignment, error.code) + member.awaitingSyncCallback(member.assignment, error) member.awaitingSyncCallback = null // reset the session timeout for members after propagating the member's assignment. @@ -577,14 +577,14 @@ class GroupCoordinator(val brokerId: Int, groupId != null && !groupId.isEmpty } - private def joinError(memberId: String, errorCode: Short): JoinGroupResult = { + private def joinError(memberId: String, error: Errors): JoinGroupResult = { JoinGroupResult( - members=Map.empty, - memberId=memberId, - generationId=0, - subProtocol=GroupCoordinator.NoProtocol, - leaderId=GroupCoordinator.NoLeader, - errorCode=errorCode) + members = Map.empty, + memberId = memberId, + generationId = 0, + subProtocol = GroupCoordinator.NoProtocol, + leaderId = GroupCoordinator.NoLeader, + error = error) } /** @@ -707,12 +707,12 @@ class GroupCoordinator(val brokerId: Int, for (member <- group.allMemberMetadata) { assert(member.awaitingJoinCallback != null) val joinResult = JoinGroupResult( - members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty }, - memberId=member.memberId, - generationId=group.generationId, - subProtocol=group.protocol, - leaderId=group.leaderId, - errorCode=Errors.NONE.code) + members = if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty }, + memberId = member.memberId, + generationId = group.generationId, + subProtocol = group.protocol, + leaderId = group.leaderId, + error = Errors.NONE) member.awaitingJoinCallback(joinResult) member.awaitingJoinCallback = null @@ -814,4 +814,4 @@ case class JoinGroupResult(members: Map[String, Array[Byte]], generationId: Int, subProtocol: String, leaderId: String, - errorCode: Short) + error: Errors)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index c66ce74..a6ed6a9 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -224,7 +224,7 @@ class GroupMetadataManager(val brokerId: Int, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], - responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore] = { + responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Option[DelayedStore] = { // first filter out partitions with offset metadata size exceeding limit val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) => validateOffsetMetadataLength(offsetAndMetadata.metadata) @@ -254,7 +254,7 @@ class GroupMetadataManager(val brokerId: Int, // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) - val responseCode = + val response = group synchronized { if (status.error == Errors.NONE) { if (!group.is(Dead)) { @@ -262,7 +262,7 @@ class GroupMetadataManager(val brokerId: Int, group.completePendingOffsetWrite(topicPartition, offsetAndMetadata) } } - Errors.NONE.code + Errors.NONE } else { if (!group.is(Dead)) { filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) => @@ -291,16 +291,16 @@ class GroupMetadataManager(val brokerId: Int, case other => other } - responseError.code + responseError } } // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) => if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) - (topicPartition, responseCode) + (topicPartition, response) else - (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) + (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE) } // finally trigger the callback logic passed from the API layer @@ -315,7 +315,7 @@ class GroupMetadataManager(val brokerId: Int, case None => val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) => - (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code) + (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP) } responseCallback(commitStatus) None http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/coordinator/MemberMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala index 6149276..729e483 100644 --- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala @@ -23,6 +23,8 @@ import kafka.utils.nonthreadsafe import scala.collection.Map +import org.apache.kafka.common.protocol.Errors + case class MemberSummary(memberId: String, clientId: String, @@ -62,7 +64,7 @@ private[coordinator] class MemberMetadata(val memberId: String, var assignment: Array[Byte] = Array.empty[Byte] var awaitingJoinCallback: JoinGroupResult => Unit = null - var awaitingSyncCallback: (Array[Byte], Short) => Unit = null + var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null var latestHeartbeat: Long = -1 var isLeaving: Boolean = false http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/FetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/FetchResponse.scala b/core/src/main/scala/kafka/javaapi/FetchResponse.scala index 37db3f7..9c67dd8 100644 --- a/core/src/main/scala/kafka/javaapi/FetchResponse.scala +++ b/core/src/main/scala/kafka/javaapi/FetchResponse.scala @@ -28,7 +28,9 @@ class FetchResponse(private val underlying: kafka.api.FetchResponse) { def hasError = underlying.hasError - def errorCode(topic: String, partition: Int) = underlying.errorCode(topic, partition) + def error(topic: String, partition: Int) = underlying.error(topic, partition) + + def errorCode(topic: String, partition: Int) = error(topic, partition).code override def equals(other: Any) = canEqual(other) && { val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse] http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala index 0e14758..9871ca0 100644 --- a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala +++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala @@ -22,7 +22,9 @@ import kafka.cluster.BrokerEndPoint class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinatorResponse) { - def errorCode = underlying.errorCode + def error = underlying.error + + def errorCode = error.code def coordinator: BrokerEndPoint = { import kafka.javaapi.Implicits._ http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala index c79f5b6..c348eba 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala @@ -20,16 +20,18 @@ package kafka.javaapi import java.nio.ByteBuffer import kafka.common.TopicAndPartition +import org.apache.kafka.common.protocol.Errors import scala.collection.JavaConverters._ class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) { - def errors: java.util.Map[TopicAndPartition, Short] = underlying.commitStatus.asJava + def errors: java.util.Map[TopicAndPartition, Errors] = underlying.commitStatus.asJava def hasError = underlying.hasError - def errorCode(topicAndPartition: TopicAndPartition) = underlying.commitStatus(topicAndPartition) + def error(topicAndPartition: TopicAndPartition) = underlying.commitStatus(topicAndPartition) + def errorCode(topicAndPartition: TopicAndPartition) = error(topicAndPartition).code } object OffsetCommitResponse { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/OffsetResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala index 8b1847e..42ee2ab 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala @@ -23,10 +23,10 @@ class OffsetResponse(private val underlying: kafka.api.OffsetResponse) { def hasError = underlying.hasError - - def errorCode(topic: String, partition: Int) = + def error(topic: String, partition: Int) = underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).error + def errorCode(topic: String, partition: Int) = error(topic, partition).code def offsets(topic: String, partition: Int) = underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).offsets.toArray http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/TopicMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index 4e2631f..c9ec48a 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -17,6 +17,7 @@ package kafka.javaapi import kafka.cluster.BrokerEndPoint +import org.apache.kafka.common.protocol.Errors import scala.collection.JavaConverters._ private[javaapi] object MetadataListImplicits { @@ -35,7 +36,9 @@ class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { underlying.partitionsMetadata } - def errorCode: Short = underlying.errorCode + def error = underlying.error + + def errorCode = error.code def sizeInBytes: Int = underlying.sizeInBytes @@ -55,7 +58,9 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { def isr: java.util.List[BrokerEndPoint] = underlying.isr.asJava - def errorCode: Short = underlying.errorCode + def error = underlying.error + + def errorCode = error.code def sizeInBytes: Int = underlying.sizeInBytes http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 97289a1..e77a50c 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -56,8 +56,8 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, } val partitionMetadata = metadata.partitionsMetadata if(partitionMetadata.isEmpty) { - if(metadata.errorCode != Errors.NONE.code) { - throw new KafkaException(Errors.forCode(metadata.errorCode).exception) + if(metadata.error != Errors.NONE) { + throw new KafkaException(metadata.error.exception) } else { throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata)) } @@ -85,14 +85,14 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, // throw partition specific exception topicsMetadata.foreach(tmd =>{ trace("Metadata for topic %s is %s".format(tmd.topic, tmd)) - if(tmd.errorCode == Errors.NONE.code) { + if(tmd.error == Errors.NONE) { topicPartitionInfo.put(tmd.topic, tmd) } else - warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, Errors.forCode(tmd.errorCode).exception.getClass)) + warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, tmd.error.exception.getClass)) tmd.partitionsMetadata.foreach(pmd =>{ - if (pmd.errorCode != Errors.NONE.code && pmd.errorCode == Errors.LEADER_NOT_AVAILABLE.code) { + if (pmd.error != Errors.NONE && pmd.error == Errors.LEADER_NOT_AVAILABLE) { warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId, - Errors.forCode(pmd.errorCode).exception.getClass)) + pmd.error.exception.getClass)) } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata }) }) http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 380b1c8..77c3b7d 100755 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -281,11 +281,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig, if (response.status.size != producerRequest.data.size) throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) if (logger.isTraceEnabled) { - val successfullySentData = response.status.filter(_._2.error == Errors.NONE.code) + val successfullySentData = response.status.filter(_._2.error == Errors.NONE) successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString())))) } - val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE.code).toSeq + val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE).toSeq failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1) if(failedTopicPartitions.nonEmpty) { val errorString = failedPartitionsAndStatus @@ -293,7 +293,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition)) .map{ case(topicAndPartition, status) => - topicAndPartition.toString + ": " + Errors.forCode(status.error).exceptionName + topicAndPartition.toString + ": " + status.error.exceptionName }.mkString(",") warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/security/auth/ResourceType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala index 4e264ca..9630c82 100644 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala @@ -24,21 +24,21 @@ import org.apache.kafka.common.protocol.Errors */ -sealed trait ResourceType extends BaseEnum { def errorCode: Short } +sealed trait ResourceType extends BaseEnum { def error: Errors } case object Cluster extends ResourceType { val name = "Cluster" - val errorCode = Errors.CLUSTER_AUTHORIZATION_FAILED.code + val error = Errors.CLUSTER_AUTHORIZATION_FAILED } case object Topic extends ResourceType { val name = "Topic" - val errorCode = Errors.TOPIC_AUTHORIZATION_FAILED.code + val error = Errors.TOPIC_AUTHORIZATION_FAILED } case object Group extends ResourceType { val name = "Group" - val errorCode = Errors.GROUP_AUTHORIZATION_FAILED.code + val error = Errors.GROUP_AUTHORIZATION_FAILED } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index febe9da..6462968 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -141,7 +141,7 @@ abstract class AbstractFetcherThread(name: String, Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState => // we append to the log if the current offset is defined and it is the same as the offset requested during fetch if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) { - Errors.forCode(partitionData.errorCode) match { + partitionData.error match { case Errors.NONE => try { val records = partitionData.toRecords @@ -259,7 +259,7 @@ object AbstractFetcherThread { } trait PartitionData { - def errorCode: Short + def error: Errors def exception: Option[Throwable] def toRecords: MemoryRecords def highWatermark: Long http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fad75ec..1308216 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -148,10 +148,10 @@ class KafkaApis(val requestChannel: RequestChannel, val leaderAndIsrResponse = if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange) - new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava) + new LeaderAndIsrResponse(result.error, result.responseMap.asJava) } else { - val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap - new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava) + val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap + new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava) } requestChannel.sendResponse(new Response(request, leaderAndIsrResponse)) @@ -178,15 +178,15 @@ class KafkaApis(val requestChannel: RequestChannel, // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader, // is not cleared. - result.foreach { case (topicPartition, errorCode) => - if (errorCode == Errors.NONE.code && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) { + result.foreach { case (topicPartition, error) => + if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) { coordinator.handleGroupEmigration(topicPartition.partition) } } - new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava) + new StopReplicaResponse(error, result.asJava) } else { - val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap - new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava) + val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap + new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava) } requestChannel.sendResponse(new RequestChannel.Response(request, response)) @@ -208,9 +208,9 @@ class KafkaApis(val requestChannel: RequestChannel, adminManager.tryCompleteDelayedTopicOperations(topic) } } - new UpdateMetadataResponse(Errors.NONE.code) + new UpdateMetadataResponse(Errors.NONE) } else { - new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code) + new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED) } requestChannel.sendResponse(new Response(request, updateMetadataResponse)) @@ -226,7 +226,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, - Errors.NONE.code, partitionsRemaining) + Errors.NONE, partitionsRemaining) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse))) } @@ -239,9 +239,9 @@ class KafkaApis(val requestChannel: RequestChannel, // reject the request if not authorized to the group if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) { - val errorCode = new JShort(Errors.GROUP_AUTHORIZATION_FAILED.code) + val error = Errors.GROUP_AUTHORIZATION_FAILED val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition => - (topicPartition, errorCode) + (topicPartition, error) }.toMap val response = new OffsetCommitResponse(results.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, response)) @@ -262,16 +262,16 @@ class KafkaApis(val requestChannel: RequestChannel, } // the callback for sending an offset commit response - def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { - val combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++ - unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++ - nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) + def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) { + val combinedCommitStatus = commitStatus ++ + unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++ + nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION) if (isDebugEnabled) - combinedCommitStatus.foreach { case (topicPartition, errorCode) => - if (errorCode != Errors.NONE.code) { + combinedCommitStatus.foreach { case (topicPartition, error) => + if (error != Errors.NONE) { debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") + s"on partition $topicPartition failed due to ${error.exceptionName}") } } val response = new OffsetCommitResponse(combinedCommitStatus.asJava) @@ -287,13 +287,13 @@ class KafkaApis(val requestChannel: RequestChannel, val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic) try { if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize) - (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) + (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE) else { zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString) - (topicPartition, Errors.NONE.code) + (topicPartition, Errors.NONE) } } catch { - case e: Throwable => (topicPartition, Errors.forException(e).code) + case e: Throwable => (topicPartition, Errors.forException(e)) } } sendResponseCallback(responseInfo) @@ -458,11 +458,11 @@ class KafkaApis(val requestChannel: RequestChannel, } val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map { - case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MemoryRecords.EMPTY)) + case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, -1, MemoryRecords.EMPTY)) } val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map { - case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MemoryRecords.EMPTY)) + case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -1, MemoryRecords.EMPTY)) } // the callback for sending a fetch response @@ -485,7 +485,7 @@ class KafkaApis(val requestChannel: RequestChannel, FetchPartitionData(data.error, data.hw, downConvertedRecords) } else data - tp -> new FetchResponse.PartitionData(convertedData.error.code, convertedData.hw, convertedData.records) + tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records) } } @@ -494,9 +494,9 @@ class KafkaApis(val requestChannel: RequestChannel, val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]() mergedPartitionData.foreach { case (topicPartition, data) => - if (data.errorCode != Errors.NONE.code) + if (data.error != Errors.NONE) debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " + - s"on partition $topicPartition failed due to ${Errors.forCode(data.errorCode).exceptionName}") + s"on partition $topicPartition failed due to ${data.error.exceptionName}") fetchedPartitionData.put(topicPartition, data) @@ -584,7 +584,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => - new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava) + new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, List[JLong]().asJava) ) val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) => @@ -609,17 +609,17 @@ class KafkaApis(val requestChannel: RequestChannel, allOffsets } } - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, offsets.map(new JLong(_)).asJava)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(new JLong(_)).asJava)) } catch { // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages // are typically transient and there is no value in logging the entire stack trace for the same case e @ ( _ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException) => debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) case e: Throwable => error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) } } responseMap ++ unauthorizedResponseStatus @@ -635,7 +635,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => { - new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, + new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) }) @@ -644,7 +644,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (offsetRequest.duplicatePartitions().contains(topicPartition)) { debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + s"failed because the partition is duplicated in the request.") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST.code, + (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)) } else { @@ -671,7 +671,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, found.timestamp, found.offset)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset)) } catch { // NOTE: These exceptions are special cased since these error messages are typically transient or the client // would have received a clear exception and there is no value in logging the entire stack trace for the same @@ -680,12 +680,12 @@ class KafkaApis(val requestChannel: RequestChannel, _ : UnsupportedForMessageFormatException) => debug(s"Offset request with correlation id $correlationId from client $clientId on " + s"partition $topicPartition failed due to ${e.getMessage}") - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)) case e: Throwable => error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)) } @@ -805,7 +805,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responsesForNonExistentTopics = nonExistentTopics.map { topic => if (topic == Topic.GroupMetadataTopicName) { val topicMetadata = createGroupMetadataTopic() - if (topicMetadata.error() == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) { + if (topicMetadata.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) { new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, Topic.isInternal(topic), java.util.Collections.emptyList()) } else topicMetadata @@ -973,7 +973,7 @@ class KafkaApis(val requestChannel: RequestChannel, val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest] if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) { - val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode) + val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode) requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) } else { val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId) @@ -982,7 +982,7 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName) val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) { - new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode) + new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala .find(_.partition == partition) @@ -990,9 +990,9 @@ class KafkaApis(val requestChannel: RequestChannel, coordinatorEndpoint match { case Some(endpoint) if !endpoint.isEmpty => - new GroupCoordinatorResponse(Errors.NONE.code, endpoint) + new GroupCoordinatorResponse(Errors.NONE, endpoint) case _ => - new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode) + new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode) } } @@ -1015,7 +1015,7 @@ class KafkaApis(val requestChannel: RequestChannel, val assignment = ByteBuffer.wrap(member.assignment) new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment) } - groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType, + groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType, summary.protocol, members.asJava) } }.toMap @@ -1030,7 +1030,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else { val (error, groups) = coordinator.handleListGroups() val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } - new ListGroupsResponse(error.code, allGroups.asJava) + new ListGroupsResponse(error, allGroups.asJava) } requestChannel.sendResponse(new RequestChannel.Response(request, responseBody)) } @@ -1041,7 +1041,7 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a join-group response def sendResponseCallback(joinResult: JoinGroupResult) { val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) } - val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.errorCode, joinResult.generationId, + val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.error, joinResult.generationId, joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava) trace("Sending join group response %s for correlation id %d to client %s." @@ -1052,7 +1052,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) { val responseBody = new JoinGroupResponse( request.header.apiVersion, - Errors.GROUP_AUTHORIZATION_FAILED.code, + Errors.GROUP_AUTHORIZATION_FAILED, JoinGroupResponse.UNKNOWN_GENERATION_ID, JoinGroupResponse.UNKNOWN_PROTOCOL, JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId @@ -1079,13 +1079,13 @@ class KafkaApis(val requestChannel: RequestChannel, def handleSyncGroupRequest(request: RequestChannel.Request) { val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest] - def sendResponseCallback(memberState: Array[Byte], errorCode: Short) { - val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState)) + def sendResponseCallback(memberState: Array[Byte], error: Errors) { + val responseBody = new SyncGroupResponse(error, ByteBuffer.wrap(memberState)) requestChannel.sendResponse(new Response(request, responseBody)) } if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) { - sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED.code) + sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED) } else { coordinator.handleSyncGroup( syncGroupRequest.groupId(), @@ -1101,15 +1101,15 @@ class KafkaApis(val requestChannel: RequestChannel, val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest] // the callback for sending a heartbeat response - def sendResponseCallback(errorCode: Short) { - val response = new HeartbeatResponse(errorCode) + def sendResponseCallback(error: Errors) { + val response = new HeartbeatResponse(error) trace("Sending heartbeat response %s for correlation id %d to client %s." .format(response, request.header.correlationId, request.header.clientId)) requestChannel.sendResponse(new RequestChannel.Response(request, response)) } if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) { - val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED.code) + val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED) requestChannel.sendResponse(new Response(request, heartbeatResponse)) } else { @@ -1126,15 +1126,15 @@ class KafkaApis(val requestChannel: RequestChannel, val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest] // the callback for sending a leave-group response - def sendResponseCallback(errorCode: Short) { - val response = new LeaveGroupResponse(errorCode) + def sendResponseCallback(error: Errors) { + val response = new LeaveGroupResponse(error) trace("Sending leave group response %s for correlation id %d to client %s." .format(response, request.header.correlationId, request.header.clientId)) requestChannel.sendResponse(new RequestChannel.Response(request, response)) } if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) { - val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED.code) + val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED) requestChannel.sendResponse(new Response(request, leaveGroupResponse)) } else { // let the coordinator to handle leave-group @@ -1146,7 +1146,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleSaslHandshakeRequest(request: RequestChannel.Request) { - val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE.code, config.saslEnabledMechanisms) + val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms) requestChannel.sendResponse(new RequestChannel.Response(request, response)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 779f489..54431d9 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -439,13 +439,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val clientResponse = networkClient.blockingSendAndReceive(request)(time) val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse] - if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) { + if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining.isEmpty) { shutdownSucceeded = true info("Controlled shutdown succeeded") } else { info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.asScala.mkString(","))) - info("Error code from controller: %d".format(shutdownResponse.errorCode)) + info("Error code from controller: %d".format(shutdownResponse.error.code)) } } catch { @@ -511,14 +511,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP response = channel.receive() val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload()) - if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining != null && + if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining != null && shutdownResponse.partitionsRemaining.isEmpty) { shutdownSucceeded = true info ("Controlled shutdown succeeded") } else { info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(","))) - info("Error code from controller: %d".format(shutdownResponse.errorCode)) + info("Error code from controller: %d".format(shutdownResponse.error.code)) } } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 6e6cffa..c99d7c5 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -276,7 +276,7 @@ class ReplicaFetcherThread(name: String, val clientResponse = sendRequest(requestBuilder) val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse] val partitionData = response.responseData.get(topicPartition) - Errors.forCode(partitionData.errorCode) match { + partitionData.error match { case Errors.NONE => if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) partitionData.offset @@ -321,7 +321,7 @@ object ReplicaFetcherThread { private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData { - def errorCode: Short = underlying.errorCode + def error = underlying.error def toRecords: MemoryRecords = { underlying.records.asInstanceOf[MemoryRecords] @@ -329,7 +329,7 @@ object ReplicaFetcherThread { def highWatermark: Long = underlying.highWatermark - def exception: Option[Throwable] = Errors.forCode(errorCode) match { + def exception: Option[Throwable] = error match { case Errors.NONE => None case e => Some(e.exception) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 475b2ed..1cec4a2 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -88,10 +88,10 @@ object LogReadResult { readSize = -1) } -case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Short], errorCode: Short) { +case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Errors], error: Errors) { override def toString = { - "update results: [%s], global error: [%d]".format(responseMap, errorCode) + "update results: [%s], global error: [%d]".format(responseMap, error.code) } } @@ -219,9 +219,9 @@ class ReplicaManager(val config: KafkaConfig, scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS) } - def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Short = { + def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Errors = { stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition") - val errorCode = Errors.NONE.code + val error = Errors.NONE getPartition(topicPartition) match { case Some(_) => if (deletePartition) { @@ -241,26 +241,26 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.trace(s"Broker $localBrokerId ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker") } stateChangeLogger.trace(s"Broker $localBrokerId finished handling stop replica (delete=$deletePartition) for partition $topicPartition") - errorCode + error } - def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Short], Short) = { + def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors) = { replicaStateChangeLock synchronized { - val responseMap = new collection.mutable.HashMap[TopicPartition, Short] + val responseMap = new collection.mutable.HashMap[TopicPartition, Errors] if(stopReplicaRequest.controllerEpoch() < controllerEpoch) { stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d" .format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch)) - (responseMap, Errors.STALE_CONTROLLER_EPOCH.code) + (responseMap, Errors.STALE_CONTROLLER_EPOCH) } else { val partitions = stopReplicaRequest.partitions.asScala controllerEpoch = stopReplicaRequest.controllerEpoch // First stop fetchers for all partitions, then stop the corresponding replicas replicaFetcherManager.removeFetcherForPartitions(partitions) for (topicPartition <- partitions){ - val errorCode = stopReplica(topicPartition, stopReplicaRequest.deletePartitions) - responseMap.put(topicPartition, errorCode) + val error = stopReplica(topicPartition, stopReplicaRequest.deletePartitions) + responseMap.put(topicPartition, error) } - (responseMap, Errors.NONE.code) + (responseMap, Errors.NONE) } } } @@ -649,12 +649,12 @@ class ReplicaManager(val config: KafkaConfig, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition)) } replicaStateChangeLock synchronized { - val responseMap = new mutable.HashMap[TopicPartition, Short] + val responseMap = new mutable.HashMap[TopicPartition, Errors] if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " + "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) - BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code) + BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH) } else { val controllerId = leaderAndISRRequest.controllerId controllerEpoch = leaderAndISRRequest.controllerEpoch @@ -674,7 +674,7 @@ class ReplicaManager(val config: KafkaConfig, "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(","))) - responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION) } } else { // Otherwise record the error code in response @@ -682,7 +682,7 @@ class ReplicaManager(val config: KafkaConfig, "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d") .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch)) - responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code) + responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH) } } @@ -709,7 +709,7 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.shutdownIdleFetcherThreads() onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) - BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code) + BecomeLeaderOrFollowerResult(responseMap, Errors.NONE) } } } @@ -731,7 +731,7 @@ class ReplicaManager(val config: KafkaConfig, epoch: Int, partitionState: Map[Partition, PartitionState], correlationId: Int, - responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = { + responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = { partitionState.keys.foreach { partition => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-leader transition for partition %s") @@ -739,7 +739,7 @@ class ReplicaManager(val config: KafkaConfig, } for (partition <- partitionState.keys) - responseMap.put(partition.topicPartition, Errors.NONE.code) + responseMap.put(partition.topicPartition, Errors.NONE) val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set() @@ -802,7 +802,7 @@ class ReplicaManager(val config: KafkaConfig, epoch: Int, partitionState: Map[Partition, PartitionState], correlationId: Int, - responseMap: mutable.Map[TopicPartition, Short], + responseMap: mutable.Map[TopicPartition, Errors], metadataCache: MetadataCache) : Set[Partition] = { partitionState.keys.foreach { partition => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + @@ -811,7 +811,7 @@ class ReplicaManager(val config: KafkaConfig, } for (partition <- partitionState.keys) - responseMap.put(partition.topicPartition, Errors.NONE.code) + responseMap.put(partition.topicPartition, Errors.NONE) val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 7cc2ea7..b269966 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -179,10 +179,10 @@ object ConsumerOffsetChecker extends Logging { throw z } } - else if (offsetAndMetadata.error == Errors.NONE.code) + else if (offsetAndMetadata.error == Errors.NONE) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { - println("Could not fetch offset for %s due to %s.".format(topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception)) + println("Could not fetch offset for %s due to %s.".format(topicAndPartition, offsetAndMetadata.error.exception)) } } channel.disconnect() http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 7e31ac7..492b304 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -239,7 +239,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = { offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) => - partitionOffsetsResponse.error != Errors.NONE.code + partitionOffsetsResponse.error != Errors.NONE }.mkString } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index c9d35af..3285bf2 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -105,24 +105,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse] ) - val RequestKeyToErrorCode = Map[ApiKeys, (Nothing) => Short]( - ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code), - ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.error.code), - ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), - ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), + val RequestKeyToError = Map[ApiKeys, (Nothing) => Errors]( + ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2), + ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.error), + ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error), + ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error), ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2), - ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error.code), - ApiKeys.GROUP_COORDINATOR -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()), - ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()), - ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.errorCode()), - ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.errorCode()), - ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.errorCode()), - ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.errorCode()), + ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error), + ApiKeys.GROUP_COORDINATOR -> ((resp: requests.GroupCoordinatorResponse) => resp.error), + ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error), + ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error), + ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error), + ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error), + ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error), ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2), ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2), - ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()), - ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error.code), - ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2.code) + ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.error), + ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error), + ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2) ) val RequestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( @@ -809,18 +809,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest { topicExists: Boolean = true): AbstractResponse = { val resp = send(request, apiKey) val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractResponse] - val error = Errors.forCode(RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractResponse) => Short](response)) + val error = RequestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response) val authorizationErrorCodes = resources.flatMap { resourceType => if (resourceType == Topic) { if (isAuthorized) - Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forCode(Topic.errorCode)) + Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.error) else if (!isAuthorizedTopicDescribe) Set(Errors.UNKNOWN_TOPIC_OR_PARTITION) else - Set(Errors.forCode(Topic.errorCode)) + Set(Topic.error) } else { - Set(Errors.forCode(resourceType.errorCode)) + Set(resourceType.error) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala index 3cab534..853dad6 100644 --- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala +++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala @@ -91,7 +91,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness { override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = { fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp => - (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code, -1, null))) + (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, -1, null))) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala index ffa3474..4788b4a 100644 --- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala +++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala @@ -45,7 +45,7 @@ class ReplicaVerificationToolTest { } val initialOffset = 4 val memoryRecords = MemoryRecords.withRecords(initialOffset, records: _*) - replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE.code(), hw = 20, + replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE, hw = 20, new ByteBufferMessageSet(memoryRecords.buffer))) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/other/kafka/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 324b440..d908175 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -92,7 +92,7 @@ object TestOffsetManager { numCommits.getAndIncrement commitTimer.time { val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload()) - if (response.commitStatus.exists(_._2 != Errors.NONE.code)) numErrors.getAndIncrement + if (response.commitStatus.exists(_._2 != Errors.NONE)) numErrors.getAndIncrement } offset += 1 } @@ -155,7 +155,7 @@ object TestOffsetManager { fetchTimer.time { val response = OffsetFetchResponse.readFrom(channel.receive().payload()) - if (response.requestInfo.exists(_._2.error != Errors.NONE.code)) { + if (response.requestInfo.exists(_._2.error != Errors.NONE)) { numErrors.getAndIncrement } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index e6090c1..5342dac 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -85,8 +85,8 @@ object SerializationTestUtils { def createTestProducerResponse: ProducerResponse = ProducerResponse(1, Map( - TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001), - TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) + TopicAndPartition(topic1, 0) -> ProducerResponseStatus(Errors.forCode(0.toShort), 10001), + TopicAndPartition(topic2, 0) -> ProducerResponseStatus(Errors.forCode(0.toShort), 20001) ), ProducerRequest.CurrentVersion, 100) def createTestFetchRequest: FetchRequest = new FetchRequest(requestInfo = requestInfos.toVector) @@ -100,7 +100,7 @@ object SerializationTestUtils { def createTestOffsetResponse: OffsetResponse = { new OffsetResponse(0, collection.immutable.Map( - TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(Errors.NONE.code, Seq(1000l, 2000l, 3000l, 4000l))) + TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(Errors.NONE, Seq(1000l, 2000l, 3000l, 4000l))) ) } @@ -135,8 +135,8 @@ object SerializationTestUtils { } def createTestOffsetCommitResponse: OffsetCommitResponse = { - new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) -> Errors.NONE.code, - TopicAndPartition(topic1, 1) -> Errors.NONE.code)) + new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) -> Errors.NONE, + TopicAndPartition(topic1, 1) -> Errors.NONE)) } def createTestOffsetFetchRequest: OffsetFetchRequest = { @@ -148,16 +148,16 @@ object SerializationTestUtils { def createTestOffsetFetchResponse: OffsetFetchResponse = { new OffsetFetchResponse(collection.immutable.Map( - TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", Errors.NONE.code), - TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - ), errorCode = Errors.NONE.code) + TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", Errors.NONE), + TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION) + ), error = Errors.NONE) } def createConsumerMetadataRequest: GroupCoordinatorRequest = GroupCoordinatorRequest("group 1", clientId = "client 1") def createConsumerMetadataResponse: GroupCoordinatorResponse = { GroupCoordinatorResponse(Some( - brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE.code, 0) + brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE, 0) } } @@ -175,7 +175,7 @@ class RequestResponseSerializationTest extends JUnitSuite { private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse - private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) + private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE, 0) @Test def testSerializationAndDeserialization() { @@ -203,13 +203,13 @@ class RequestResponseSerializationTest extends JUnitSuite { @Test def testProduceResponseVersion() { val oldClientResponse = ProducerResponse(1, Map( - TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), - TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + TopicAndPartition("t1", 0) -> ProducerResponseStatus(Errors.NONE, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(Errors.NONE, 20001) )) val newClientResponse = ProducerResponse(1, Map( - TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001), - TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001) + TopicAndPartition("t1", 0) -> ProducerResponseStatus(Errors.NONE, 10001), + TopicAndPartition("t2", 0) -> ProducerResponseStatus(Errors.NONE, 20001) ), 1, 100) // new response should have 4 bytes more than the old response since delayTime is an INT32