http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala new file mode 100644 index 0000000..ef94289 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator + +import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} +import kafka.log.LogConfig +import kafka.message.UncompressedCodec +import kafka.server._ +import kafka.utils._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.JoinGroupRequest + +import scala.collection.{Map, Seq, immutable} + +case class GroupManagerConfig(groupMinSessionTimeoutMs: Int, + groupMaxSessionTimeoutMs: Int) + +case class JoinGroupResult(members: Map[String, Array[Byte]], + memberId: String, + generationId: Int, + subProtocol: String, + leaderId: String, + errorCode: Short) + +/** + * GroupCoordinator handles general group membership and offset management. + * + * Each Kafka server instantiates a coordinator which is responsible for a set of + * groups. Groups are assigned to coordinators based on their group names. + */ +class GroupCoordinator(val brokerId: Int, + val groupConfig: GroupManagerConfig, + val offsetConfig: OffsetManagerConfig, + private val offsetManager: OffsetManager) extends Logging { + type JoinCallback = JoinGroupResult => Unit + type SyncCallback = (Array[Byte], Short) => Unit + + this.logIdent = "[GroupCoordinator " + brokerId + "]: " + + private val isActive = new AtomicBoolean(false) + + private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null + private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null + private var coordinatorMetadata: CoordinatorMetadata = null + + def this(brokerId: Int, + groupConfig: GroupManagerConfig, + offsetConfig: OffsetManagerConfig, + replicaManager: ReplicaManager, + zkUtils: ZkUtils, + scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig, + new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler)) + + def offsetsTopicConfigs: Properties = { + val props = new Properties + props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString) + props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name) + props + } + + /** + * NOTE: If a group lock and metadataLock are simultaneously needed, + * be sure to acquire the group lock before metadataLock to prevent deadlock + */ + + /** + * Startup logic executed at the same time when the server starts up. + */ + def startup() { + info("Starting up.") + heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) + joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId) + coordinatorMetadata = new CoordinatorMetadata(brokerId) + isActive.set(true) + info("Startup complete.") + } + + /** + * Shutdown logic executed at the same time when server shuts down. + * Ordering of actions should be reversed from the startup process. + */ + def shutdown() { + info("Shutting down.") + isActive.set(false) + offsetManager.shutdown() + coordinatorMetadata.shutdown() + heartbeatPurgatory.shutdown() + joinPurgatory.shutdown() + info("Shutdown complete.") + } + + def handleJoinGroup(groupId: String, + memberId: String, + sessionTimeoutMs: Int, + protocolType: String, + protocols: List[(String, Array[Byte])], + responseCallback: JoinCallback) { + if (!isActive.get) { + responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)) + } else if (!isCoordinatorForGroup(groupId)) { + responseCallback(joinError(memberId,Errors.NOT_COORDINATOR_FOR_GROUP.code)) + } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs || + sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) { + responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code)) + } else { + // only try to create the group if the group is not unknown AND + // the member id is UNKNOWN, if member is specified but group does not + // exist we should reject the request + var group = coordinatorMetadata.getGroup(groupId) + if (group == null) { + if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { + responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) + } else { + group = coordinatorMetadata.addGroup(groupId, protocolType) + doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback) + } + } else { + doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback) + } + } + } + + private def doJoinGroup(group: GroupMetadata, + memberId: String, + sessionTimeoutMs: Int, + protocolType: String, + protocols: List[(String, Array[Byte])], + responseCallback: JoinCallback) { + group synchronized { + if (group.protocolType != protocolType || !group.supportsProtocols(protocols.map(_._1).toSet)) { + // if the new member does not support the group protocol, reject it + responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code)) + } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { + // if the member trying to register with a un-recognized id, send the response to let + // it reset its member id and retry + responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) + } else { + group.currentState match { + case Dead => + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // joining without the specified member id, + responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) + + case PreparingRebalance => + if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { + addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback) + } else { + val member = group.get(memberId) + updateMemberAndRebalance(group, member, protocols, responseCallback) + } + + case AwaitingSync => + if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { + addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback) + } else { + val member = group.get(memberId) + if (member.matches(protocols)) { + // member is joining with the same metadata (which could be because it failed to + // receive the initial JoinGroup response), so just return current group information + // for the current generation. + responseCallback(JoinGroupResult( + members = if (memberId == group.leaderId) { + group.currentMemberMetadata + } else { + Map.empty + }, + memberId = memberId, + generationId = group.generationId, + subProtocol = group.protocol, + leaderId = group.leaderId, + errorCode = Errors.NONE.code)) + } else { + // member has changed metadata, so force a rebalance + updateMemberAndRebalance(group, member, protocols, responseCallback) + } + } + + case Stable => + if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { + // if the member id is unknown, register the member to the group + addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback) + } else { + val member = group.get(memberId) + if (memberId == group.leaderId || !member.matches(protocols)) { + // force a rebalance if a member has changed metadata or if the leader sends JoinGroup. + // The latter allows the leader to trigger rebalances for changes affecting assignment + // which do not affect the member metadata (such as topic metadata changes for the consumer) + updateMemberAndRebalance(group, member, protocols, responseCallback) + } else { + // for followers with no actual change to their metadata, just return group information + // for the current generation which will allow them to issue SyncGroup + responseCallback(JoinGroupResult( + members = Map.empty, + memberId = memberId, + generationId = group.generationId, + subProtocol = group.protocol, + leaderId = group.leaderId, + errorCode = Errors.NONE.code)) + } + } + } + + if (group.is(PreparingRebalance)) + joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId)) + } + } + } + + def handleSyncGroup(groupId: String, + generation: Int, + memberId: String, + groupAssignment: Map[String, Array[Byte]], + responseCallback: SyncCallback) { + if (!isActive.get) { + responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) + } else if (!isCoordinatorForGroup(groupId)) { + responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code) + } else { + val group = coordinatorMetadata.getGroup(groupId) + if (group == null) + responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) + else + doSyncGroup(group, generation, memberId, groupAssignment, responseCallback) + } + } + + private def doSyncGroup(group: GroupMetadata, + generationId: Int, + memberId: String, + groupAssignment: Map[String, Array[Byte]], + responseCallback: SyncCallback) { + group synchronized { + if (!group.has(memberId)) { + responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) + } else if (generationId != group.generationId) { + responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code) + } else { + group.currentState match { + case Dead => + responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) + + case PreparingRebalance => + responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code) + + case AwaitingSync => + group.get(memberId).awaitingSyncCallback = responseCallback + completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) + + // if this is the leader, then we can transition to stable and + // propagate the assignment to any awaiting members + if (memberId == group.leaderId) { + group.transitionTo(Stable) + propagateAssignment(group, groupAssignment) + } + + case Stable => + // if the group is stable, we just return the current assignment + val memberMetadata = group.get(memberId) + responseCallback(memberMetadata.assignment, Errors.NONE.code) + completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) + } + } + } + } + + def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) { + if (!isActive.get) { + responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) + } else if (!isCoordinatorForGroup(groupId)) { + responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) + } else { + val group = coordinatorMetadata.getGroup(groupId) + if (group == null) { + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the consumer to retry + // joining without specified consumer id, + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else { + group synchronized { + if (group.is(Dead)) { + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else if (!group.has(consumerId)) { + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else { + val member = group.get(consumerId) + removeHeartbeatForLeavingMember(group, member) + onMemberFailure(group, member) + responseCallback(Errors.NONE.code) + } + } + } + } + } + + def handleHeartbeat(groupId: String, + memberId: String, + generationId: Int, + responseCallback: Short => Unit) { + if (!isActive.get) { + responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) + } else if (!isCoordinatorForGroup(groupId)) { + responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) + } else { + val group = coordinatorMetadata.getGroup(groupId) + if (group == null) { + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // joining without the specified member id, + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else { + group synchronized { + if (group.is(Dead)) { + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else if (!group.is(Stable)) { + responseCallback(Errors.REBALANCE_IN_PROGRESS.code) + } else if (!group.has(memberId)) { + responseCallback(Errors.UNKNOWN_MEMBER_ID.code) + } else if (generationId != group.generationId) { + responseCallback(Errors.ILLEGAL_GENERATION.code) + } else { + val member = group.get(memberId) + completeAndScheduleNextHeartbeatExpiration(group, member) + responseCallback(Errors.NONE.code) + } + } + } + } + } + + def handleCommitOffsets(groupId: String, + memberId: String, + generationId: Int, + offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { + if (!isActive.get) { + responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)) + } else if (!isCoordinatorForGroup(groupId)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code)) + } else { + val group = coordinatorMetadata.getGroup(groupId) + if (group == null) { + if (generationId < 0) + // the group is not relying on Kafka for partition management, so allow the commit + offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback) + else + // the group has failed over to this coordinator (which will be handled in KAFKA-2017), + // or this is a request coming from an older generation. either way, reject the commit + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) + } else { + group synchronized { + if (group.is(Dead)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code)) + } else if (group.is(AwaitingSync)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code)) + } else if (!group.has(memberId)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code)) + } else if (generationId != group.generationId) { + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) + } else { + offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback) + } + } + } + } + } + + def handleFetchOffsets(groupId: String, + partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = { + if (!isActive.get) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupCoordinatorNotAvailable)}.toMap + } else if (!isCoordinatorForGroup(groupId)) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap + } else { + // return offsets blindly regardless the current group state since the group may be using + // Kafka commit storage without automatic group management + offsetManager.getOffsets(groupId, partitions) + } + } + + def handleGroupImmigration(offsetTopicPartitionId: Int) = { + // TODO we may need to add more logic in KAFKA-2017 + offsetManager.loadOffsetsFromLog(offsetTopicPartitionId) + } + + def handleGroupEmigration(offsetTopicPartitionId: Int) = { + // TODO we may need to add more logic in KAFKA-2017 + offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId) + } + + private def joinError(memberId: String, errorCode: Short): JoinGroupResult = { + JoinGroupResult( + members=Map.empty, + memberId=memberId, + generationId=0, + subProtocol=GroupCoordinator.NoProtocol, + leaderId=GroupCoordinator.NoLeader, + errorCode=errorCode) + } + + private def propagateAssignment(group: GroupMetadata, + assignment: Map[String, Array[Byte]]) { + for (member <- group.allMembers) { + member.assignment = assignment.getOrElse(member.memberId, Array.empty[Byte]) + if (member.awaitingSyncCallback != null) { + member.awaitingSyncCallback(member.assignment, Errors.NONE.code) + member.awaitingSyncCallback = null + } + } + } + + /** + * Complete existing DelayedHeartbeats for the given member and schedule the next one + */ + private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { + // complete current heartbeat expectation + member.latestHeartbeat = SystemTime.milliseconds + val memberKey = ConsumerKey(member.groupId, member.memberId) + heartbeatPurgatory.checkAndComplete(memberKey) + + // reschedule the next heartbeat expiration deadline + val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs + val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs) + heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey)) + } + + private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) { + member.isLeaving = true + val consumerKey = ConsumerKey(member.groupId, member.memberId) + heartbeatPurgatory.checkAndComplete(consumerKey) + } + + private def addMemberAndRebalance(sessionTimeoutMs: Int, + protocols: List[(String, Array[Byte])], + group: GroupMetadata, + callback: JoinCallback) = { + val memberId = group.generateNextMemberId + val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols) + member.awaitingJoinCallback = callback + group.add(member.memberId, member) + maybePrepareRebalance(group) + member + } + + private def updateMemberAndRebalance(group: GroupMetadata, + member: MemberMetadata, + protocols: List[(String, Array[Byte])], + callback: JoinCallback) { + member.supportedProtocols = protocols + member.awaitingJoinCallback = callback + maybePrepareRebalance(group) + } + + private def maybePrepareRebalance(group: GroupMetadata) { + group synchronized { + if (group.canRebalance) + prepareRebalance(group) + } + } + + private def prepareRebalance(group: GroupMetadata) { + // if any members are awaiting sync, cancel their request and have them rejoin + if (group.is(AwaitingSync)) { + for (member <- group.allMembers) { + if (member.awaitingSyncCallback != null) { + member.awaitingSyncCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code) + member.awaitingSyncCallback = null + } + } + } + + group.allMembers.foreach(_.assignment = null) + group.transitionTo(PreparingRebalance) + info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId)) + + val rebalanceTimeout = group.rebalanceTimeout + val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout) + val consumerGroupKey = ConsumerGroupKey(group.groupId) + joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey)) + } + + private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) { + trace("Member %s in group %s has failed".format(member.memberId, group.groupId)) + group.remove(member.memberId) + group.currentState match { + case Dead => + case Stable | AwaitingSync => maybePrepareRebalance(group) + case PreparingRebalance => joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId)) + } + } + + def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = { + group synchronized { + if (group.notYetRejoinedMembers.isEmpty) + forceComplete() + else false + } + } + + def onExpireJoin() { + // TODO: add metrics for restabilize timeouts + } + + def onCompleteJoin(group: GroupMetadata) { + group synchronized { + val failedMembers = group.notYetRejoinedMembers + if (group.isEmpty || !failedMembers.isEmpty) { + failedMembers.foreach { failedMember => + group.remove(failedMember.memberId) + // TODO: cut the socket connection to the client + } + + if (group.isEmpty) { + group.transitionTo(Dead) + info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) + coordinatorMetadata.removeGroup(group.groupId) + } + } + if (!group.is(Dead)) { + group.initNextGeneration + info("Stabilized group %s generation %s".format(group.groupId, group.generationId)) + + // trigger the awaiting join group response callback for all the members after rebalancing + for (member <- group.allMembers) { + assert(member.awaitingJoinCallback != null) + val joinResult = JoinGroupResult( + members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty }, + memberId=member.memberId, + generationId=group.generationId, + subProtocol=group.protocol, + leaderId=group.leaderId, + errorCode=Errors.NONE.code) + + member.awaitingJoinCallback(joinResult) + member.awaitingJoinCallback = null + completeAndScheduleNextHeartbeatExpiration(group, member) + } + } + } + } + + def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = { + group synchronized { + if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving) + forceComplete() + else false + } + } + + def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { + group synchronized { + if (!shouldKeepMemberAlive(member, heartbeatDeadline)) + onMemberFailure(group, member) + } + } + + def onCompleteHeartbeat() { + // TODO: add metrics for complete heartbeats + } + + def partitionFor(group: String): Int = offsetManager.partitionFor(group) + + private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) = + member.awaitingJoinCallback != null || + member.awaitingSyncCallback != null || + member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline + + private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) +} + +object GroupCoordinator { + + val NoProtocol = "" + val NoLeader = "" + val OffsetsTopicName = "__consumer_offsets" + + def create(config: KafkaConfig, + zkUtils: ZkUtils, + replicaManager: ReplicaManager, + kafkaScheduler: KafkaScheduler): GroupCoordinator = { + val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, + loadBufferSize = config.offsetsLoadBufferSize, + offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, + offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, + offsetsTopicNumPartitions = config.offsetsTopicPartitions, + offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, + offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, + offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, + groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs) + + new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler) + } + + def create(config: KafkaConfig, + zkUtils: ZkUtils, + offsetManager: OffsetManager): GroupCoordinator = { + val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, + loadBufferSize = config.offsetsLoadBufferSize, + offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, + offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, + offsetsTopicNumPartitions = config.offsetsTopicPartitions, + offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, + offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, + offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, + groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs) + + new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager) + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/GroupMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala new file mode 100644 index 0000000..60ee987 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.utils.nonthreadsafe + +import java.util.UUID + +import org.apache.kafka.common.protocol.Errors + +import collection.mutable + +private[coordinator] sealed trait GroupState { def state: Byte } + +/** + * Group is preparing to rebalance + * + * action: respond to heartbeats with REBALANCE_IN_PROGRESS + * respond to sync group with REBALANCE_IN_PROGRESS + * remove member on leave group request + * park join group requests from new or existing members until all expected members have joined + * allow offset commits from previous generation + * allow offset fetch requests + * transition: some members have joined by the timeout => AwaitingSync + * all members have left the group => Dead + */ +private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 } + +/** + * Group is awaiting state assignment from the leader + * + * action: respond to heartbeats with REBALANCE_IN_PROGRESS + * respond to offset commits with REBALANCE_IN_PROGRESS + * park sync group requests from followers until transition to Stable + * allow offset fetch requests + * transition: sync group with state assignment received from leader => Stable + * join group from new member or existing member with updated metadata => PreparingRebalance + * leave group from existing member => PreparingRebalance + * member failure detected => PreparingRebalance + */ +private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5} + +/** + * Group is stable + * + * action: respond to member heartbeats normally + * respond to sync group from any member with current assignment + * respond to join group from followers with matching metadata with current group metadata + * allow offset commits from member of current generation + * allow offset fetch requests + * transition: member failure detected via heartbeat => PreparingRebalance + * leave group from existing member => PreparingRebalance + * leader join-group received => PreparingRebalance + * follower join-group with new metadata => PreparingRebalance + */ +private[coordinator] case object Stable extends GroupState { val state: Byte = 3 } + +/** + * Group has no more members + * + * action: respond to join group with UNKNOWN_MEMBER_ID + * respond to sync group with UNKNOWN_MEMBER_ID + * respond to heartbeat with UNKNOWN_MEMBER_ID + * respond to leave group with UNKNOWN_MEMBER_ID + * respond to offset commit with UNKNOWN_MEMBER_ID + * allow offset fetch requests + * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions + */ +private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } + + +private object GroupMetadata { + private val validPreviousStates: Map[GroupState, Set[GroupState]] = + Map(Dead -> Set(PreparingRebalance), + AwaitingSync -> Set(PreparingRebalance), + Stable -> Set(AwaitingSync), + PreparingRebalance -> Set(Stable, AwaitingSync)) +} + +/** + * Group contains the following metadata: + * + * Membership metadata: + * 1. Members registered in this group + * 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers) + * 3. Protocol metadata associated with group members + * + * State metadata: + * 1. group state + * 2. generation id + * 3. leader id + */ +@nonthreadsafe +private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) { + + private val members = new mutable.HashMap[String, MemberMetadata] + private var state: GroupState = Stable + var generationId = 0 + var leaderId: String = null + var protocol: String = null + + def is(groupState: GroupState) = state == groupState + def not(groupState: GroupState) = state != groupState + def has(memberId: String) = members.contains(memberId) + def get(memberId: String) = members(memberId) + + def add(memberId: String, member: MemberMetadata) { + assert(supportsProtocols(member.protocols)) + + if (leaderId == null) + leaderId = memberId + members.put(memberId, member) + } + + def remove(memberId: String) { + members.remove(memberId) + if (memberId == leaderId) { + leaderId = if (members.isEmpty) { + null + } else { + members.keys.head + } + } + } + + def currentState = state + + def isEmpty = members.isEmpty + + def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList + + def allMembers = members.values.toList + + def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) => + timeout.max(member.sessionTimeoutMs) + } + + // TODO: decide if ids should be predictable or random + def generateNextMemberId = UUID.randomUUID().toString + + def canRebalance = state == Stable || state == AwaitingSync + + def transitionTo(groupState: GroupState) { + assertValidTransition(groupState) + state = groupState + } + + def selectProtocol: String = { + if (members.isEmpty) + throw new IllegalStateException("Cannot select protocol for empty group") + + // select the protocol for this group which is supported by all members + val candidates = candidateProtocols + + // let each member vote for one of the protocols and choose the one with the most votes + val votes: List[(String, Int)] = allMembers + .map(_.vote(candidates)) + .groupBy(identity) + .mapValues(_.size) + .toList + + votes.maxBy(_._2)._1 + } + + private def candidateProtocols = { + // get the set of protocols that are commonly supported by all members + allMembers + .map(_.protocols) + .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols) + } + + def supportsProtocols(memberProtocols: Set[String]) = { + isEmpty || (memberProtocols & candidateProtocols).nonEmpty + } + + def initNextGeneration = { + assert(notYetRejoinedMembers == List.empty[MemberMetadata]) + generationId += 1 + protocol = selectProtocol + transitionTo(AwaitingSync) + } + + def currentMemberMetadata: Map[String, Array[Byte]] = { + if (is(Dead) || is(PreparingRebalance)) + throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state)) + members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap + } + + private def assertValidTransition(targetState: GroupState) { + if (!GroupMetadata.validPreviousStates(targetState).contains(state)) + throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state" + .format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/MemberMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala new file mode 100644 index 0000000..7f7df9a --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import java.util + +import kafka.utils.nonthreadsafe + +import scala.collection.Map + +/** + * Member metadata contains the following metadata: + * + * Heartbeat metadata: + * 1. negotiated heartbeat session timeout + * 2. timestamp of the latest heartbeat + * + * Protocol metadata: + * 1. the list of supported protocols (ordered by preference) + * 2. the metadata associated with each protocol + * + * In addition, it also contains the following state information: + * + * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state, + * its rebalance callback will be kept in the metadata if the + * member has sent the join group request + * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback + * is kept in metadata until the leader provides the group assignment + * and the group transitions to stable + */ +@nonthreadsafe +private[coordinator] class MemberMetadata(val memberId: String, + val groupId: String, + val sessionTimeoutMs: Int, + var supportedProtocols: List[(String, Array[Byte])]) { + + var assignment: Array[Byte] = null + var awaitingJoinCallback: JoinGroupResult => Unit = null + var awaitingSyncCallback: (Array[Byte], Short) => Unit = null + var latestHeartbeat: Long = -1 + var isLeaving: Boolean = false + + def protocols = supportedProtocols.map(_._1).toSet + + /** + * Get metadata corresponding to the provided protocol. + */ + def metadata(protocol: String): Array[Byte] = { + supportedProtocols.find(_._1 == protocol) match { + case Some((_, metadata)) => metadata + case None => + throw new IllegalArgumentException("Member does not support protocol") + } + } + + /** + * Check if the provided protocol metadata matches the currently stored metadata. + */ + def matches(protocols: List[(String, Array[Byte])]): Boolean = { + if (protocols.size != this.supportedProtocols.size) + return false + + for (i <- 0 until protocols.size) { + val p1 = protocols(i) + val p2 = supportedProtocols(i) + if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2)) + return false + } + return true + } + + /** + * Vote for one of the potential group protocols. This takes into account the protocol preference as + * indicated by the order of supported protocols and returns the first one also contained in the set + */ + def vote(candidates: Set[String]): String = { + supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match { + case Some((protocol, _)) => protocol + case None => + throw new IllegalArgumentException("Member does not support any of the candidate protocols") + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala deleted file mode 100644 index 8499bf8..0000000 --- a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.common.TopicAndPartition -import kafka.utils.CoreUtils - -private[coordinator] trait PartitionAssignor { - /** - * Assigns partitions to consumers in a group. - * @return A mapping from consumer to assigned partitions. - */ - def assign(topicsPerConsumer: Map[String, Set[String]], - partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] - - protected def fill[K, V](vsPerK: Map[K, Set[V]], expectedKs: Set[K]): Map[K, Set[V]] = { - val unfilledKs = expectedKs -- vsPerK.keySet - vsPerK ++ unfilledKs.map(k => (k, Set.empty[V])) - } - - protected def aggregate[K, V](pairs: Seq[(K, V)]): Map[K, Set[V]] = { - pairs - .groupBy { case (k, v) => k } - .map { case (k, kvPairs) => (k, kvPairs.map(_._2).toSet) } - } - - protected def invert[K, V](vsPerK: Map[K, Set[V]]): Map[V, Set[K]] = { - val vkPairs = vsPerK.toSeq.flatMap { case (k, vs) => vs.map(v => (v, k)) } - aggregate(vkPairs) - } -} - -private[coordinator] object PartitionAssignor { - val strategies = Set("range", "roundrobin") - - def createInstance(strategy: String) = strategy match { - case "roundrobin" => new RoundRobinAssignor() - case _ => new RangeAssignor() - } -} - -/** - * The roundrobin assignor lays out all the available partitions and all the available consumers. It - * then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer - * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts - * will be within a delta of exactly one across all consumers.) - * - * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, - * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. - * - * The assignment will be: - * C0 -> [t0p0, t0p2, t1p1] - * C1 -> [t0p1, t1p0, t1p2] - */ -private[coordinator] class RoundRobinAssignor extends PartitionAssignor { - override def assign(topicsPerConsumer: Map[String, Set[String]], - partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = { - val consumers = topicsPerConsumer.keys.toSeq.sorted - val topics = topicsPerConsumer.values.flatten.toSeq.distinct.sorted - - val allTopicPartitions = topics.flatMap { topic => - val numPartitionsForTopic = partitionsPerTopic(topic) - (0 until numPartitionsForTopic).map(partition => TopicAndPartition(topic, partition)) - } - - var consumerAssignor = CoreUtils.circularIterator(consumers) - val consumerPartitionPairs = allTopicPartitions.map { topicAndPartition => - consumerAssignor = consumerAssignor.dropWhile(consumerId => !topicsPerConsumer(consumerId).contains(topicAndPartition.topic)) - val consumer = consumerAssignor.next() - (consumer, topicAndPartition) - } - fill(aggregate(consumerPartitionPairs), topicsPerConsumer.keySet) - } -} - -/** - * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order - * and the consumers in lexicographic order. We then divide the number of partitions by the total number of - * consumers to determine the number of partitions to assign to each consumer. If it does not evenly - * divide, then the first few consumers will have one extra partition. - * - * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, - * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. - * - * The assignment will be: - * C0 -> [t0p0, t0p1, t1p0, t1p1] - * C1 -> [t0p2, t1p2] - */ -private[coordinator] class RangeAssignor extends PartitionAssignor { - override def assign(topicsPerConsumer: Map[String, Set[String]], - partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = { - val consumersPerTopic = invert(topicsPerConsumer) - val consumerPartitionPairs = consumersPerTopic.toSeq.flatMap { case (topic, consumersForTopic) => - val numPartitionsForTopic = partitionsPerTopic(topic) - - val numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size - val consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size - - consumersForTopic.toSeq.sorted.zipWithIndex.flatMap { case (consumerForTopic, consumerIndex) => - val startPartition = numPartitionsPerConsumer * consumerIndex + consumerIndex.min(consumersWithExtraPartition) - val numPartitions = numPartitionsPerConsumer + (if (consumerIndex + 1 > consumersWithExtraPartition) 0 else 1) - - // The first few consumers pick up an extra partition, if any. - (startPartition until startPartition + numPartitions) - .map(partition => (consumerForTopic, TopicAndPartition(topic, partition))) - } - } - fill(aggregate(consumerPartitionPairs), topicsPerConsumer.keySet) - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala deleted file mode 100644 index 4345a8e..0000000 --- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.javaapi - -import java.nio.ByteBuffer -import kafka.cluster.BrokerEndPoint - -class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) { - - def errorCode = underlying.errorCode - - def coordinator: BrokerEndPoint = { - import kafka.javaapi.Implicits._ - underlying.coordinatorOpt - } - - override def equals(other: Any) = canEqual(other) && { - val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.ConsumerMetadataResponse] - this.underlying.equals(otherConsumerMetadataResponse.underlying) - } - - def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.ConsumerMetadataResponse] - - override def hashCode = underlying.hashCode - - override def toString = underlying.toString - -} - -object ConsumerMetadataResponse { - def readFrom(buffer: ByteBuffer) = new ConsumerMetadataResponse(kafka.api.ConsumerMetadataResponse.readFrom(buffer)) -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala new file mode 100644 index 0000000..b94aa01 --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import java.nio.ByteBuffer +import kafka.cluster.BrokerEndPoint + +class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataResponse) { + + def errorCode = underlying.errorCode + + def coordinator: BrokerEndPoint = { + import kafka.javaapi.Implicits._ + underlying.coordinatorOpt + } + + override def equals(other: Any) = canEqual(other) && { + val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupMetadataResponse] + this.underlying.equals(otherConsumerMetadataResponse.underlying) + } + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupMetadataResponse] + + override def hashCode = underlying.hashCode + + override def toString = underlying.toString + +} + +object GroupMetadataResponse { + def readFrom(buffer: ByteBuffer) = new GroupMetadataResponse(kafka.api.GroupMetadataResponse.readFrom(buffer)) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/security/auth/ResourceType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala index 3b8312d..ceb6348 100644 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala @@ -33,8 +33,8 @@ case object Topic extends ResourceType { val name = "Topic" } -case object ConsumerGroup extends ResourceType { - val name = "ConsumerGroup" +case object Group extends ResourceType { + val name = "Group" } @@ -45,5 +45,5 @@ object ResourceType { rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) } - def values: Seq[ResourceType] = List(Cluster, Topic, ConsumerGroup) + def values: Seq[ResourceType] = List(Cluster, Topic, Group) } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6acab8d..c80bd46 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,24 +17,26 @@ package kafka.server -import kafka.message.MessageSet -import kafka.security.auth.Topic -import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.TopicPartition -import kafka.api._ +import java.nio.ByteBuffer + import kafka.admin.AdminUtils +import kafka.api._ import kafka.common._ import kafka.controller.KafkaController -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.{GroupCoordinator, JoinGroupResult} import kafka.log._ +import kafka.message.MessageSet import kafka.network._ import kafka.network.RequestChannel.{Session, Response} -import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend} -import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging} -import scala.collection._ +import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write} +import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient -import kafka.security.auth.{Authorizer, Read, Write, Create, ClusterAction, Describe, Resource, Topic, Operation, ConsumerGroup} +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse} +import org.apache.kafka.common.utils.Utils + +import scala.collection._ /** @@ -42,7 +44,7 @@ import kafka.security.auth.{Authorizer, Read, Write, Create, ClusterAction, Desc */ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, - val coordinator: ConsumerCoordinator, + val coordinator: GroupCoordinator, val controller: KafkaController, val zkUtils: ZkUtils, val brokerId: Int, @@ -73,10 +75,11 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) - case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) + case RequestKeys.GroupMetadataKey => handleGroupMetadataRequest(request) case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request) case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request) case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request) + case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -114,12 +117,12 @@ class KafkaApis(val requestChannel: RequestChannel, // for each new leader or follower, call coordinator to handle // consumer group migration result.updatedLeaders.foreach { case partition => - if (partition.topic == ConsumerCoordinator.OffsetsTopicName) + if (partition.topic == GroupCoordinator.OffsetsTopicName) coordinator.handleGroupImmigration(partition.partitionId) } result.updatedFollowers.foreach { case partition => partition.leaderReplicaIdOpt.foreach { leaderReplica => - if (partition.topic == ConsumerCoordinator.OffsetsTopicName && + if (partition.topic == GroupCoordinator.OffsetsTopicName && leaderReplica == brokerId) coordinator.handleGroupEmigration(partition.partitionId) } @@ -188,7 +191,7 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { case (topicAndPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) && - authorize(request.session, Read, new Resource(ConsumerGroup, offsetCommitRequest.groupId)) + authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId)) } // the callback for sending an offset commit response @@ -268,7 +271,7 @@ class KafkaApis(val requestChannel: RequestChannel, // call coordinator to handle commit offset coordinator.handleCommitOffsets( offsetCommitRequest.groupId, - offsetCommitRequest.consumerId, + offsetCommitRequest.memberId, offsetCommitRequest.groupGenerationId, offsetData, sendResponseCallback) @@ -526,9 +529,9 @@ class KafkaApis(val requestChannel: RequestChannel, if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == ConsumerCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (topic == GroupCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) { try { - if (topic == ConsumerCoordinator.OffsetsTopicName) { + if (topic == GroupCoordinator.OffsetsTopicName) { val aliveBrokers = metadataCache.getAliveBrokers val offsetsTopicReplicationFactor = if (aliveBrokers.length > 0) @@ -610,7 +613,7 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition => authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) && - authorize(request.session, Read, new Resource(ConsumerGroup, offsetFetchRequest.groupId)) + authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)) } val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode) @@ -659,29 +662,29 @@ class KafkaApis(val requestChannel: RequestChannel, /* * Handle a consumer metadata request */ - def handleConsumerMetadataRequest(request: RequestChannel.Request) { - val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest] + def handleGroupMetadataRequest(request: RequestChannel.Request) { + val groupMetadataRequest = request.requestObj.asInstanceOf[GroupMetadataRequest] - if (!authorize(request.session, Read, new Resource(ConsumerGroup, consumerMetadataRequest.group))) { - val response = ConsumerMetadataResponse(None, ErrorMapping.AuthorizationCode, consumerMetadataRequest.correlationId) + if (!authorize(request.session, Read, new Resource(Group, groupMetadataRequest.group))) { + val response = GroupMetadataResponse(None, ErrorMapping.AuthorizationCode, groupMetadataRequest.correlationId) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response))) } else { - val partition = coordinator.partitionFor(consumerMetadataRequest.group) + val partition = coordinator.partitionFor(groupMetadataRequest.group) - //get metadata (and create the topic if necessary) - val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName), request.securityProtocol).head + // get metadata (and create the topic if necessary) + val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.OffsetsTopicName), request.securityProtocol).head - val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId) + val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId) val response = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata => partitionMetadata.leader.map { leader => - ConsumerMetadataResponse(Some(leader), ErrorMapping.NoError, consumerMetadataRequest.correlationId) + GroupMetadataResponse(Some(leader), ErrorMapping.NoError, groupMetadataRequest.correlationId) }.getOrElse(errorResponse) }.getOrElse(errorResponse) trace("Sending consumer metadata %s for correlation id %d to client %s." - .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId)) + .format(response, groupMetadataRequest.correlationId, groupMetadataRequest.clientId)) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response))) } } @@ -690,39 +693,65 @@ class KafkaApis(val requestChannel: RequestChannel, import JavaConversions._ val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest] - val respHeader = new ResponseHeader(request.header.correlationId) + val responseHeader = new ResponseHeader(request.header.correlationId) // the callback for sending a join-group response - def sendResponseCallback(partitions: Set[TopicAndPartition], consumerId: String, generationId: Int, errorCode: Short) { - val partitionList = if (errorCode == ErrorMapping.NoError) - partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer - else - List.empty.toBuffer - - val responseBody = new JoinGroupResponse(errorCode, generationId, consumerId, partitionList) - + def sendResponseCallback(joinResult: JoinGroupResult) { + val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) } + val responseBody = new JoinGroupResponse(joinResult.errorCode, joinResult.generationId, joinResult.subProtocol, + joinResult.memberId, joinResult.leaderId, members) trace("Sending join group response %s for correlation id %d to client %s." .format(responseBody, request.header.correlationId, request.header.clientId)) - requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, responseBody))) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } - // ensure that the client is authorized to join the group and read from all subscribed topics - if (!authorize(request.session, Read, new Resource(ConsumerGroup, joinGroupRequest.groupId())) || - joinGroupRequest.topics().exists(topic => !authorize(request.session, Read, new Resource(Topic, topic)))) { - val responseBody = new JoinGroupResponse(ErrorMapping.AuthorizationCode, 0, joinGroupRequest.consumerId(), List.empty[TopicPartition]) - requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, responseBody))) + if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) { + val responseBody = new JoinGroupResponse( + ErrorMapping.AuthorizationCode, + JoinGroupResponse.UNKNOWN_GENERATION_ID, + JoinGroupResponse.UNKNOWN_PROTOCOL, + JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId + JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId + Map.empty[String, ByteBuffer]) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { // let the coordinator to handle join-group + val protocols = joinGroupRequest.groupProtocols().map(protocol => + (protocol.name, Utils.toArray(protocol.metadata))).toList coordinator.handleJoinGroup( joinGroupRequest.groupId(), - joinGroupRequest.consumerId(), - joinGroupRequest.topics().toSet, + joinGroupRequest.memberId(), joinGroupRequest.sessionTimeout(), - joinGroupRequest.strategy(), + joinGroupRequest.protocolType(), + protocols, sendResponseCallback) } } + def handleSyncGroupRequest(request: RequestChannel.Request) { + import JavaConversions._ + + val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest] + + def sendResponseCallback(memberState: Array[Byte], errorCode: Short) { + val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState)) + val responseHeader = new ResponseHeader(request.header.correlationId) + requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) + } + + if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) { + sendResponseCallback(Array[Byte](), ErrorMapping.AuthorizationCode) + } else { + coordinator.handleSyncGroup( + syncGroupRequest.groupId(), + syncGroupRequest.generationId(), + syncGroupRequest.memberId(), + syncGroupRequest.groupAssignment().mapValues(Utils.toArray(_)), + sendResponseCallback + ) + } + } + def handleHeartbeatRequest(request: RequestChannel.Request) { val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest] val respHeader = new ResponseHeader(request.header.correlationId) @@ -735,7 +764,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response))) } - if (!authorize(request.session, Read, new Resource(ConsumerGroup, heartbeatRequest.groupId))) { + if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) { val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode) requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse))) } @@ -743,7 +772,7 @@ class KafkaApis(val requestChannel: RequestChannel, // let the coordinator to handle heartbeat coordinator.handleHeartbeat( heartbeatRequest.groupId(), - heartbeatRequest.consumerId(), + heartbeatRequest.memberId(), heartbeatRequest.groupGenerationId(), sendResponseCallback) } @@ -788,11 +817,16 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response))) } - // let the coordinator to handle leave-group - coordinator.handleLeaveGroup( - leaveGroupRequest.groupId(), - leaveGroupRequest.consumerId(), - sendResponseCallback) + if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) { + val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.AuthorizationCode) + requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse))) + } else { + // let the coordinator to handle leave-group + coordinator.handleLeaveGroup( + leaveGroupRequest.groupId(), + leaveGroupRequest.consumerId(), + sendResponseCallback) + } } def close() { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 194ee9c..b054f48 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -277,9 +277,9 @@ object KafkaConfig { val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" val ControlledShutdownEnableProp = "controlled.shutdown.enable" - /** ********* Consumer coordinator configuration ***********/ - val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms" - val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms" + /** ********* Group coordinator configuration ***********/ + val GroupMinSessionTimeoutMsProp = "group.min.session.timeout.ms" + val GroupMaxSessionTimeoutMsProp = "group.max.session.timeout.ms" /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes" val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size" @@ -619,8 +619,8 @@ object KafkaConfig { .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc) /** ********* Consumer coordinator configuration ***********/ - .define(ConsumerMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc) - .define(ConsumerMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc) + .define(GroupMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc) + .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc) /** ********* Offset management configuration ***********/ .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc) @@ -799,9 +799,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp) val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp) - /** ********* Consumer coordinator configuration ***********/ - val consumerMinSessionTimeoutMs = getInt(KafkaConfig.ConsumerMinSessionTimeoutMsProp) - val consumerMaxSessionTimeoutMs = getInt(KafkaConfig.ConsumerMaxSessionTimeoutMsProp) + /** ********* Group coordinator configuration ***********/ + val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp) + val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp) /** ********* Offset management configuration ***********/ val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp) http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index beea83a..84d48cb 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -50,7 +50,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker import kafka.network.{BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import kafka.coordinator.{ConsumerCoordinator} +import kafka.coordinator.{GroupManagerConfig, GroupCoordinator} object KafkaServer { // Copy the subset of properties that are relevant to Logs @@ -119,7 +119,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr var dynamicConfigHandlers: Map[String, ConfigHandler] = null var dynamicConfigManager: DynamicConfigManager = null - var consumerCoordinator: ConsumerCoordinator = null + var consumerCoordinator: GroupCoordinator = null var kafkaController: KafkaController = null @@ -187,7 +187,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr kafkaController.startup() /* start kafka coordinator */ - consumerCoordinator = ConsumerCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler) + consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler) consumerCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index bdc3bb6..967dc6f 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -32,7 +32,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import kafka.tools.MessageFormatter import kafka.api.ProducerResponseStatus -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.GroupCoordinator import scala.Some import scala.collection._ @@ -144,9 +144,9 @@ class OffsetManager(val config: OffsetManagerConfig, // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) => - val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) + val partitionOpt = replicaManager.getPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition) partitionOpt.map { partition => - val appendPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) + val appendPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition) val messages = tombstones.map(_._2).toSeq trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) @@ -225,7 +225,7 @@ class OffsetManager(val config: OffsetManagerConfig, ) }.toSeq - val offsetTopicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, partitionFor(groupId)) + val offsetTopicPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, partitionFor(groupId)) val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) @@ -336,7 +336,7 @@ class OffsetManager(val config: OffsetManagerConfig, */ def loadOffsetsFromLog(offsetsPartition: Int) { - val topicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) + val topicPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition) loadingPartitions synchronized { if (loadingPartitions.contains(offsetsPartition)) { @@ -408,7 +408,7 @@ class OffsetManager(val config: OffsetManagerConfig, } private def getHighWatermark(partitionId: Int): Long = { - val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, partitionId) + val partitionOpt = replicaManager.getPartition(GroupCoordinator.OffsetsTopicName, partitionId) val hw = partitionOpt.map { partition => partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L) @@ -436,7 +436,7 @@ class OffsetManager(val config: OffsetManagerConfig, } if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition." - .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition))) + .format(numRemoved, TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition))) } def shutdown() { @@ -448,7 +448,7 @@ class OffsetManager(val config: OffsetManagerConfig, * If the topic does not exist, the configured partition count is returned. */ private def getOffsetsTopicPartitionCount = { - val topic = ConsumerCoordinator.OffsetsTopicName + val topic = GroupCoordinator.OffsetsTopicName val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic)) if (topicData(topic).nonEmpty) topicData(topic).size http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 84bebef..f99f0d8 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -29,7 +29,7 @@ import org.junit.{Test, Before} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.GroupCoordinator /** * Integration tests for the new consumer that cover basic usage as well as server failures @@ -50,7 +50,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") - this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout + this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") @@ -154,7 +154,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { val numRecords = 10000 sendRecords(numRecords) - consumer0.subscribe(List(topic)) + val rebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = { + // keep partitions paused in this test so that we can verify the commits based on specific seeks + partitions.foreach(consumer0.pause(_)) + } + + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {} + } + + consumer0.subscribe(List(topic), rebalanceListener) val assignment = Set(tp, tp2) TestUtils.waitUntilTrue(() => { @@ -166,11 +175,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { consumer0.seek(tp2, 500) // change subscription to trigger rebalance - consumer0.subscribe(List(topic, topic2)) + consumer0.subscribe(List(topic, topic2), rebalanceListener) val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)) TestUtils.waitUntilTrue(() => { - consumer0.poll(50) + val records = consumer0.poll(50) consumer0.assignment() == newAssignment.asJava }, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}") @@ -421,9 +430,9 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { consumer0.poll(50) // get metadata for the topic - var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala + var parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala while(parts == null) - parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala + parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala assertEquals(1, parts.size) assertNotNull(parts(0).leader()) @@ -436,6 +445,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { consumer0.poll(50) assertEquals(2, listener.callsToAssigned) + + // only expect one revocation since revoke is not invoked on initial membership assertEquals(2, listener.callsToRevoked) consumer0.close() http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index db610c1..f2b0f85 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -19,13 +19,15 @@ import kafka.server.KafkaConfig import kafka.utils.{Logging, ShutdownableThread, TestUtils} import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} -import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownConsumerIdException} +import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownMemberIdException} import org.apache.kafka.common.TopicPartition import org.junit.Assert._ import org.junit.{Test, Before} import scala.collection.JavaConversions._ + + /** * Integration tests for the new consumer that cover basic usage as well as server failures */ @@ -43,7 +45,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") - this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "10") // set small enough session timeout + this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) @@ -108,7 +110,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { } catch { // TODO: should be no need to catch these exceptions once KAFKA-2017 is // merged since coordinator fail-over will not cause a rebalance - case _: UnknownConsumerIdException | _: IllegalGenerationException => + case _: UnknownMemberIdException | _: IllegalGenerationException => } } scheduler.shutdown() @@ -176,4 +178,6 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { } futures.map(_.get) } + + } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 2ec59fb..5741ce2 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -18,16 +18,16 @@ package kafka.api import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import kafka.utils.TestUtils import java.util.Properties -import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness + import org.junit.{After, Before} import scala.collection.mutable.Buffer -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.GroupCoordinator /** * A helper class for writing integration tests that involve producers, consumers, and servers @@ -60,14 +60,14 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl) consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) - consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range") for(i <- 0 until producerCount) producers += new KafkaProducer(producerConfig) - for(i <- 0 until consumerCount) + for(i <- 0 until consumerCount) { consumers += new KafkaConsumer(consumerConfig) + } // create the consumer offset topic - TestUtils.createTopic(zkUtils, ConsumerCoordinator.OffsetsTopicName, + TestUtils.createTopic(zkUtils, GroupCoordinator.OffsetsTopicName, serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt, serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, servers, http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/QuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index bdf7e49..735a3b2 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -101,7 +101,6 @@ class QuotasTest extends KafkaServerTestHarness { classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) - consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range") consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1) consumers += new KafkaConsumer(consumerProps) http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala deleted file mode 100644 index 1d13d88..0000000 --- a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import java.io.File - -import org.apache.kafka.common.protocol.SecurityProtocol - -class SslConsumerTest extends BaseConsumerTest { - override protected def securityProtocol = SecurityProtocol.SSL - override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) -}
