http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala deleted file mode 100644 index ea1c0d0..0000000 --- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.cluster.BrokerEndPoint -import kafka.common.ErrorMapping - -object ConsumerMetadataResponse { - val CurrentVersion = 0 - - private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1)) - - def readFrom(buffer: ByteBuffer) = { - val correlationId = buffer.getInt - val errorCode = buffer.getShort - val broker = BrokerEndPoint.readFrom(buffer) - val coordinatorOpt = if (errorCode == ErrorMapping.NoError) - Some(broker) - else - None - - ConsumerMetadataResponse(coordinatorOpt, errorCode, correlationId) - } - -} - -case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int) - extends RequestOrResponse() { - - def sizeInBytes = - 4 + /* correlationId */ - 2 + /* error code */ - coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes - - def writeTo(buffer: ByteBuffer) { - buffer.putInt(correlationId) - buffer.putShort(errorCode) - coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer)) - } - - def describe(details: Boolean) = toString -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/GroupMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala new file mode 100644 index 0000000..075ddb5 --- /dev/null +++ b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer + +import kafka.common.ErrorMapping +import kafka.network.{RequestOrResponseSend, RequestChannel} +import kafka.network.RequestChannel.Response + +object GroupMetadataRequest { + val CurrentVersion = 0.shortValue + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer) = { + // envelope + val versionId = buffer.getShort + val correlationId = buffer.getInt + val clientId = ApiUtils.readShortString(buffer) + + // request + val group = ApiUtils.readShortString(buffer) + GroupMetadataRequest(group, versionId, correlationId, clientId) + } + +} + +case class GroupMetadataRequest(group: String, + versionId: Short = GroupMetadataRequest.CurrentVersion, + correlationId: Int = 0, + clientId: String = GroupMetadataRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.GroupMetadataKey)) { + + def sizeInBytes = + 2 + /* versionId */ + 4 + /* correlationId */ + ApiUtils.shortStringLength(clientId) + + ApiUtils.shortStringLength(group) + + def writeTo(buffer: ByteBuffer) { + // envelope + buffer.putShort(versionId) + buffer.putInt(correlationId) + ApiUtils.writeShortString(buffer, clientId) + + // consumer metadata request + ApiUtils.writeShortString(buffer, group) + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + // return ConsumerCoordinatorNotAvailable for all uncaught errors + val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) + } + + def describe(details: Boolean) = { + val consumerMetadataRequest = new StringBuilder + consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName) + consumerMetadataRequest.append("; Version: " + versionId) + consumerMetadataRequest.append("; CorrelationId: " + correlationId) + consumerMetadataRequest.append("; ClientId: " + clientId) + consumerMetadataRequest.append("; Group: " + group) + consumerMetadataRequest.toString() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/GroupMetadataResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala new file mode 100644 index 0000000..2d65917 --- /dev/null +++ b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.cluster.BrokerEndPoint +import kafka.common.ErrorMapping + +object GroupMetadataResponse { + val CurrentVersion = 0 + + private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1)) + + def readFrom(buffer: ByteBuffer) = { + val correlationId = buffer.getInt + val errorCode = buffer.getShort + val broker = BrokerEndPoint.readFrom(buffer) + val coordinatorOpt = if (errorCode == ErrorMapping.NoError) + Some(broker) + else + None + + GroupMetadataResponse(coordinatorOpt, errorCode, correlationId) + } + +} + +case class GroupMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int) + extends RequestOrResponse() { + + def sizeInBytes = + 4 + /* correlationId */ + 2 + /* error code */ + coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes + + def writeTo(buffer: ByteBuffer) { + buffer.putInt(correlationId) + buffer.putShort(errorCode) + coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer)) + } + + def describe(details: Boolean) = toString +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 5b362ef..75067cf 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -41,7 +41,7 @@ object OffsetCommitRequest extends Logging { val clientId = readShortString(buffer) // Read the OffsetRequest - val consumerGroupId = readShortString(buffer) + val groupId = readShortString(buffer) // version 1 and 2 specific fields val groupGenerationId: Int = @@ -50,11 +50,11 @@ object OffsetCommitRequest extends Logging { else org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID - val consumerId: String = + val memberId: String = if (versionId >= 1) readShortString(buffer) else - org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_MEMBER_ID // version 2 specific fields val retentionMs: Long = @@ -83,7 +83,7 @@ object OffsetCommitRequest extends Logging { }) }) - OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId, retentionMs) + OffsetCommitRequest(groupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, memberId, retentionMs) } } @@ -93,7 +93,7 @@ case class OffsetCommitRequest(groupId: String, correlationId: Int = 0, clientId: String = OffsetCommitRequest.DefaultClientId, groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID, - consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID, + memberId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_MEMBER_ID, retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { @@ -114,7 +114,7 @@ case class OffsetCommitRequest(groupId: String, // version 1 and 2 specific data if (versionId >= 1) { buffer.putInt(groupGenerationId) - writeShortString(buffer, consumerId) + writeShortString(buffer, memberId) } // version 2 or above specific data @@ -142,7 +142,7 @@ case class OffsetCommitRequest(groupId: String, 4 + /* correlationId */ shortStringLength(clientId) + shortStringLength(groupId) + - (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) + + (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(memberId) else 0) + (if (versionId >= 2) 8 /* retention time */ else 0) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { @@ -175,7 +175,7 @@ case class OffsetCommitRequest(groupId: String, offsetCommitRequest.append("; ClientId: " + clientId) offsetCommitRequest.append("; GroupId: " + groupId) offsetCommitRequest.append("; GroupGenerationId: " + groupGenerationId) - offsetCommitRequest.append("; ConsumerId: " + consumerId) + offsetCommitRequest.append("; MemberId: " + memberId) offsetCommitRequest.append("; RetentionMs: " + retentionMs) if(details) offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(",")) http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/RequestKeys.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index 8a22c1a..669b63a 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -33,10 +33,11 @@ object RequestKeys { val ControlledShutdownKey: Short = 7 val OffsetCommitKey: Short = 8 val OffsetFetchKey: Short = 9 - val ConsumerMetadataKey: Short = 10 + val GroupMetadataKey: Short = 10 val JoinGroupKey: Short = 11 val HeartbeatKey: Short = 12 val LeaveGroupKey: Short = 13 + val SyncGroupKey: Short = 14 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -49,7 +50,7 @@ object RequestKeys { ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom) + GroupMetadataKey -> ("GroupMetadata", GroupMetadataRequest.readFrom) ) def nameForKey(key: Short): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/client/ClientUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 6ae0347..36b5b3b 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -151,9 +151,9 @@ object ClientUtils extends Logging{ if (!queryChannel.isConnected) queryChannel = channelToAnyBroker(zkUtils) debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) - queryChannel.send(ConsumerMetadataRequest(group)) + queryChannel.send(GroupMetadataRequest(group)) val response = queryChannel.receive() - val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.payload()) + val consumerMetadataResponse = GroupMetadataResponse.readFrom(response.payload()) debug("Consumer metadata response: " + consumerMetadataResponse.toString) if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) coordinatorOpt = consumerMetadataResponse.coordinatorOpt http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index deb48b1..bbee894 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -64,8 +64,9 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = object OffsetMetadataAndError { val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code) val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code) - val UnknownConsumer = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_CONSUMER_ID.code) - val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) + val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID.code) + val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP.code) + val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code) http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/common/Topic.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index db75d4b..ca41eba 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -18,7 +18,7 @@ package kafka.common import util.matching.Regex -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.GroupCoordinator object Topic { @@ -26,7 +26,7 @@ object Topic { private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") - val InternalTopics = Set(ConsumerCoordinator.OffsetsTopicName) + val InternalTopics = Set(GroupCoordinator.OffsetsTopicName) def validate(topic: String) { if (topic.length <= 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/consumer/SimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index b7af6d6..5b1aead 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -112,9 +112,9 @@ class SimpleConsumer(val host: String, TopicMetadataResponse.readFrom(response.payload()) } - def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = { + def send(request: GroupMetadataRequest): GroupMetadataResponse = { val response = sendRequest(request) - ConsumerMetadataResponse.readFrom(response.payload()) + GroupMetadataResponse.readFrom(response.payload()) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala deleted file mode 100644 index bf23e9b..0000000 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ /dev/null @@ -1,535 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.coordinator - -import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition} -import kafka.message.UncompressedCodec -import kafka.log.LogConfig -import kafka.server._ -import kafka.utils._ -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.JoinGroupRequest - -import org.I0Itec.zkclient.ZkClient -import java.util.concurrent.atomic.AtomicBoolean -import java.util.Properties -import scala.collection.{Map, Seq, immutable} - -case class GroupManagerConfig(consumerMinSessionTimeoutMs: Int, - consumerMaxSessionTimeoutMs: Int) - -/** - * ConsumerCoordinator handles consumer group and consumer offset management. - * - * Each Kafka server instantiates a coordinator which is responsible for a set of - * consumer groups. Consumer groups are assigned to coordinators based on their - * group names. - */ -class ConsumerCoordinator(val brokerId: Int, - val groupConfig: GroupManagerConfig, - val offsetConfig: OffsetManagerConfig, - private val offsetManager: OffsetManager, - zkUtils: ZkUtils) extends Logging { - - this.logIdent = "[ConsumerCoordinator " + brokerId + "]: " - - private val isActive = new AtomicBoolean(false) - - private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null - private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null - private var coordinatorMetadata: CoordinatorMetadata = null - - def this(brokerId: Int, - groupConfig: GroupManagerConfig, - offsetConfig: OffsetManagerConfig, - replicaManager: ReplicaManager, - zkUtils: ZkUtils, - scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig, - new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler), zkUtils) - - def offsetsTopicConfigs: Properties = { - val props = new Properties - props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name) - props - } - - /** - * NOTE: If a group lock and metadataLock are simultaneously needed, - * be sure to acquire the group lock before metadataLock to prevent deadlock - */ - - /** - * Startup logic executed at the same time when the server starts up. - */ - def startup() { - info("Starting up.") - heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) - rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId) - coordinatorMetadata = new CoordinatorMetadata(brokerId, zkUtils, maybePrepareRebalance) - isActive.set(true) - info("Startup complete.") - } - - /** - * Shutdown logic executed at the same time when server shuts down. - * Ordering of actions should be reversed from the startup process. - */ - def shutdown() { - info("Shutting down.") - isActive.set(false) - offsetManager.shutdown() - coordinatorMetadata.shutdown() - heartbeatPurgatory.shutdown() - rebalancePurgatory.shutdown() - info("Shutdown complete.") - } - - def handleJoinGroup(groupId: String, - consumerId: String, - topics: Set[String], - sessionTimeoutMs: Int, - partitionAssignmentStrategy: String, - responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) { - if (!isActive.get) { - responseCallback(Set.empty, consumerId, 0, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code) - } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) - } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code) - } else if (sessionTimeoutMs < groupConfig.consumerMinSessionTimeoutMs || - sessionTimeoutMs > groupConfig.consumerMaxSessionTimeoutMs) { - responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code) - } else { - // only try to create the group if the group is not unknown AND - // the consumer id is UNKNOWN, if consumer is specified but group does not - // exist we should reject the request - var group = coordinatorMetadata.getGroup(groupId) - if (group == null) { - if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) { - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) - } else { - group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy) - doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) - } - } else { - doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback) - } - } - } - - private def doJoinGroup(group: ConsumerGroupMetadata, - consumerId: String, - topics: Set[String], - sessionTimeoutMs: Int, - partitionAssignmentStrategy: String, - responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) { - group synchronized { - if (group.is(Dead)) { - // if the group is marked as dead, it means some other thread has just removed the group - // from the coordinator metadata; this is likely that the group has migrated to some other - // coordinator OR the group is in a transient unstable phase. Let the consumer to retry - // joining without specified consumer id, - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) - } else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) { - responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code) - } else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId)) { - // if the consumer trying to register with a un-recognized id, send the response to let - // it reset its consumer id and retry - responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code) - } else if (group.has(consumerId) && group.is(Stable) && topics == group.get(consumerId).topics) { - /* - * if an existing consumer sends a JoinGroupRequest with no changes while the group is stable, - * just treat it like a heartbeat and return their currently assigned partitions. - */ - val consumer = group.get(consumerId) - completeAndScheduleNextHeartbeatExpiration(group, consumer) - responseCallback(consumer.assignedTopicPartitions, consumerId, group.generationId, Errors.NONE.code) - } else { - val consumer = if (consumerId == JoinGroupRequest.UNKNOWN_CONSUMER_ID) { - // if the consumer id is unknown, register this consumer to the group - val generatedConsumerId = group.generateNextConsumerId - val consumer = addConsumer(generatedConsumerId, topics, sessionTimeoutMs, group) - maybePrepareRebalance(group) - consumer - } else { - val consumer = group.get(consumerId) - if (topics != consumer.topics) { - // existing consumer changed its subscribed topics - updateConsumer(group, consumer, topics) - maybePrepareRebalance(group) - consumer - } else { - // existing consumer rejoining a group due to rebalance - consumer - } - } - - consumer.awaitingRebalanceCallback = responseCallback - - if (group.is(PreparingRebalance)) - rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId)) - } - } - } - - def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) { - if (!isActive.get) { - responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code) - } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code) - } else { - val group = coordinatorMetadata.getGroup(groupId) - if (group == null) { - // if the group is marked as dead, it means some other thread has just removed the group - // from the coordinator metadata; this is likely that the group has migrated to some other - // coordinator OR the group is in a transient unstable phase. Let the consumer to retry - // joining without specified consumer id, - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) - } else { - group synchronized { - if (group.is(Dead)) { - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) - } else if (!group.has(consumerId)) { - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) - } else { - val consumer = group.get(consumerId) - removeHeartbeatForLeavingConsumer(group, consumer) - onConsumerFailure(group, consumer) - responseCallback(Errors.NONE.code) - if (group.is(PreparingRebalance)) - rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId)) - } - } - } - } - } - - def handleHeartbeat(groupId: String, - consumerId: String, - generationId: Int, - responseCallback: Short => Unit) { - if (!isActive.get) { - responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code) - } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code) - } else { - val group = coordinatorMetadata.getGroup(groupId) - if (group == null) { - // if the group is marked as dead, it means some other thread has just removed the group - // from the coordinator metadata; this is likely that the group has migrated to some other - // coordinator OR the group is in a transient unstable phase. Let the consumer to retry - // joining without specified consumer id, - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) - } else { - group synchronized { - if (group.is(Dead)) { - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) - } else if (!group.has(consumerId)) { - responseCallback(Errors.UNKNOWN_CONSUMER_ID.code) - } else if (generationId != group.generationId) { - responseCallback(Errors.ILLEGAL_GENERATION.code) - } else if (!group.is(Stable)) { - responseCallback(Errors.REBALANCE_IN_PROGRESS.code) - } else { - val consumer = group.get(consumerId) - completeAndScheduleNextHeartbeatExpiration(group, consumer) - responseCallback(Errors.NONE.code) - } - } - } - } - } - - def handleCommitOffsets(groupId: String, - consumerId: String, - generationId: Int, - offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], - responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { - if (!isActive.get) { - responseCallback(offsetMetadata.mapValues(_ => Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)) - } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_CONSUMER.code)) - } else { - val group = coordinatorMetadata.getGroup(groupId) - if (group == null) { - if (generationId < 0) - // the group is not relying on Kafka for partition management, so allow the commit - offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback) - else - // the group has failed over to this coordinator (which will be handled in KAFKA-2017), - // or this is a request coming from an older generation. either way, reject the commit - responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) - } else { - group synchronized { - if (group.is(Dead)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code)) - } else if (!group.has(consumerId)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code)) - } else if (generationId != group.generationId) { - responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) - } else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) { - responseCallback(offsetMetadata.mapValues(_ => Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code)) - } else { - offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback) - } - } - } - } - } - - def handleFetchOffsets(groupId: String, - partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = { - if (!isActive.get) { - partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap - } else if (!isCoordinatorForGroup(groupId)) { - partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap - } else { - val group = coordinatorMetadata.getGroup(groupId) - if (group == null) { - // if the group does not exist, it means this group is not relying - // on Kafka for partition management, and hence never send join-group - // request to the coordinator before; in this case blindly fetch the offsets - offsetManager.getOffsets(groupId, partitions) - } else { - group synchronized { - if (group.is(Dead)) { - partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)}.toMap - } else { - offsetManager.getOffsets(groupId, partitions) - } - } - } - } - } - - def handleGroupImmigration(offsetTopicPartitionId: Int) = { - // TODO we may need to add more logic in KAFKA-2017 - offsetManager.loadOffsetsFromLog(offsetTopicPartitionId) - } - - def handleGroupEmigration(offsetTopicPartitionId: Int) = { - // TODO we may need to add more logic in KAFKA-2017 - offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId) - } - - /** - * Complete existing DelayedHeartbeats for the given consumer and schedule the next one - */ - private def completeAndScheduleNextHeartbeatExpiration(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) { - // complete current heartbeat expectation - consumer.latestHeartbeat = SystemTime.milliseconds - val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId) - heartbeatPurgatory.checkAndComplete(consumerKey) - - // reschedule the next heartbeat expiration deadline - val newHeartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs - val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer, newHeartbeatDeadline, consumer.sessionTimeoutMs) - heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey)) - } - - private def removeHeartbeatForLeavingConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) { - consumer.isLeaving = true - val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId) - heartbeatPurgatory.checkAndComplete(consumerKey) - } - - private def addConsumer(consumerId: String, - topics: Set[String], - sessionTimeoutMs: Int, - group: ConsumerGroupMetadata) = { - val consumer = new ConsumerMetadata(consumerId, group.groupId, topics, sessionTimeoutMs) - val topicsToBind = topics -- group.topics - group.add(consumer.consumerId, consumer) - coordinatorMetadata.bindGroupToTopics(group.groupId, topicsToBind) - consumer - } - - private def removeConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) { - group.remove(consumer.consumerId) - val topicsToUnbind = consumer.topics -- group.topics - coordinatorMetadata.unbindGroupFromTopics(group.groupId, topicsToUnbind) - } - - private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) { - val topicsToBind = topics -- group.topics - group.remove(consumer.consumerId) - val topicsToUnbind = consumer.topics -- (group.topics ++ topics) - group.add(consumer.consumerId, consumer) - consumer.topics = topics - coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind) - } - - private def maybePrepareRebalance(group: ConsumerGroupMetadata) { - group synchronized { - if (group.canRebalance) - prepareRebalance(group) - } - } - - private def prepareRebalance(group: ConsumerGroupMetadata) { - group.transitionTo(PreparingRebalance) - info("Preparing to rebalance group %s with old generation %s".format(group.groupId, group.generationId)) - - val rebalanceTimeout = group.rebalanceTimeout - val delayedRebalance = new DelayedRebalance(this, group, rebalanceTimeout) - val consumerGroupKey = ConsumerGroupKey(group.groupId) - rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey)) - } - - private def rebalance(group: ConsumerGroupMetadata) { - assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata]) - - group.transitionTo(Rebalancing) - group.generationId += 1 - - info("Rebalancing group %s with new generation %s".format(group.groupId, group.generationId)) - - val assignedPartitionsPerConsumer = reassignPartitions(group) - trace("Rebalance for group %s generation %s has assigned partitions: %s" - .format(group.groupId, group.generationId, assignedPartitionsPerConsumer)) - - group.transitionTo(Stable) - info("Stabilized group %s generation %s".format(group.groupId, group.generationId)) - } - - private def onConsumerFailure(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) { - trace("Consumer %s in group %s has failed".format(consumer.consumerId, group.groupId)) - removeConsumer(group, consumer) - maybePrepareRebalance(group) - } - - private def reassignPartitions(group: ConsumerGroupMetadata) = { - val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy) - val topicsPerConsumer = group.topicsPerConsumer - val partitionsPerTopic = coordinatorMetadata.partitionsPerTopic - val assignedPartitionsPerConsumer = assignor.assign(topicsPerConsumer, partitionsPerTopic) - assignedPartitionsPerConsumer.foreach { case (consumerId, partitions) => - group.get(consumerId).assignedTopicPartitions = partitions - } - assignedPartitionsPerConsumer - } - - def tryCompleteRebalance(group: ConsumerGroupMetadata, forceComplete: () => Boolean) = { - group synchronized { - if (group.notYetRejoinedConsumers.isEmpty) - forceComplete() - else false - } - } - - def onExpirationRebalance() { - // TODO: add metrics for rebalance timeouts - } - - def onCompleteRebalance(group: ConsumerGroupMetadata) { - group synchronized { - val failedConsumers = group.notYetRejoinedConsumers - if (group.isEmpty || !failedConsumers.isEmpty) { - failedConsumers.foreach { failedConsumer => - removeConsumer(group, failedConsumer) - // TODO: cut the socket connection to the consumer - } - - if (group.isEmpty) { - group.transitionTo(Dead) - info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) - coordinatorMetadata.removeGroup(group.groupId, group.topics) - } - } - if (!group.is(Dead)) { - // assign partitions to existing consumers of the group according to the partitioning strategy - rebalance(group) - - // trigger the awaiting join group response callback for all the consumers after rebalancing - for (consumer <- group.allConsumers) { - assert(consumer.awaitingRebalanceCallback != null) - consumer.awaitingRebalanceCallback(consumer.assignedTopicPartitions, consumer.consumerId, group.generationId, Errors.NONE.code) - consumer.awaitingRebalanceCallback = null - completeAndScheduleNextHeartbeatExpiration(group, consumer) - } - } - } - } - - def tryCompleteHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = { - group synchronized { - if (shouldKeepConsumerAlive(consumer, heartbeatDeadline) || consumer.isLeaving) - forceComplete() - else false - } - } - - def onExpirationHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long) { - group synchronized { - if (!shouldKeepConsumerAlive(consumer, heartbeatDeadline)) - onConsumerFailure(group, consumer) - } - } - - def onCompleteHeartbeat() { - // TODO: add metrics for complete heartbeats - } - - def partitionFor(group: String): Int = offsetManager.partitionFor(group) - - private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) = - consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline - - private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) -} - -object ConsumerCoordinator { - - val OffsetsTopicName = "__consumer_offsets" - - def create(config: KafkaConfig, - zkUtils: ZkUtils, - replicaManager: ReplicaManager, - kafkaScheduler: KafkaScheduler): ConsumerCoordinator = { - val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, - loadBufferSize = config.offsetsLoadBufferSize, - offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, - offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, - offsetsTopicNumPartitions = config.offsetsTopicPartitions, - offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, - offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, - offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) - val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs, - consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs) - - new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler) - } - - def create(config: KafkaConfig, - zkUtils: ZkUtils, - offsetManager: OffsetManager): ConsumerCoordinator = { - val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, - loadBufferSize = config.offsetsLoadBufferSize, - offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, - offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, - offsetsTopicNumPartitions = config.offsetsTopicPartitions, - offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, - offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, - offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) - val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs, - consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs) - - new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkUtils) - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala deleted file mode 100644 index 0e3657f..0000000 --- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.utils.nonthreadsafe - -import java.util.UUID - -import collection.mutable - -private[coordinator] sealed trait GroupState { def state: Byte } - -/** - * Consumer group is preparing to rebalance - * - * action: respond to heartbeats with an ILLEGAL GENERATION error code - * transition: some consumers have joined by the timeout => Rebalancing - * all consumers have left the group => Dead - */ -private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 } - -/** - * Consumer group is rebalancing - * - * action: compute the group's partition assignment - * send the join-group response with new partition assignment when rebalance is complete - * transition: partition assignment has been computed => Stable - */ -private[coordinator] case object Rebalancing extends GroupState { val state: Byte = 2 } - -/** - * Consumer group is stable - * - * action: respond to consumer heartbeats normally - * transition: consumer failure detected via heartbeat => PreparingRebalance - * consumer join-group received => PreparingRebalance - * zookeeper topic watcher fired => PreparingRebalance - */ -private[coordinator] case object Stable extends GroupState { val state: Byte = 3 } - -/** - * Consumer group has no more members - * - * action: none - * transition: none - */ -private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } - - -private object ConsumerGroupMetadata { - private val validPreviousStates: Map[GroupState, Set[GroupState]] = - Map(Dead -> Set(PreparingRebalance), - Stable -> Set(Rebalancing), - PreparingRebalance -> Set(Stable), - Rebalancing -> Set(PreparingRebalance)) -} - -/** - * Group contains the following metadata: - * - * Membership metadata: - * 1. Consumers registered in this group - * 2. Partition assignment strategy for this group - * - * State metadata: - * 1. group state - * 2. generation id - */ -@nonthreadsafe -private[coordinator] class ConsumerGroupMetadata(val groupId: String, - val partitionAssignmentStrategy: String) { - - private val consumers = new mutable.HashMap[String, ConsumerMetadata] - private var state: GroupState = Stable - var generationId = 0 - - def is(groupState: GroupState) = state == groupState - def has(consumerId: String) = consumers.contains(consumerId) - def get(consumerId: String) = consumers(consumerId) - - def add(consumerId: String, consumer: ConsumerMetadata) { - consumers.put(consumerId, consumer) - } - - def remove(consumerId: String) { - consumers.remove(consumerId) - } - - def isEmpty = consumers.isEmpty - - def topicsPerConsumer = consumers.mapValues(_.topics).toMap - - def topics = consumers.values.flatMap(_.topics).toSet - - def notYetRejoinedConsumers = consumers.values.filter(_.awaitingRebalanceCallback == null).toList - - def allConsumers = consumers.values.toList - - def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) => - timeout.max(consumer.sessionTimeoutMs) - } - - // TODO: decide if ids should be predictable or random - def generateNextConsumerId = UUID.randomUUID().toString - - def canRebalance = state == Stable - - def transitionTo(groupState: GroupState) { - assertValidTransition(groupState) - state = groupState - } - - private def assertValidTransition(targetState: GroupState) { - if (!ConsumerGroupMetadata.validPreviousStates(targetState).contains(state)) - throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state" - .format(groupId, ConsumerGroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala deleted file mode 100644 index 64ed4a5..0000000 --- a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.common.TopicAndPartition -import kafka.utils.nonthreadsafe - -/** - * Consumer metadata contains the following metadata: - * - * Heartbeat metadata: - * 1. negotiated heartbeat session timeout - * 2. timestamp of the latest heartbeat - * - * Subscription metadata: - * 1. subscribed topics - * 2. assigned partitions for the subscribed topics - * - * In addition, it also contains the following state information: - * - * 1. Awaiting rebalance callback: when the consumer group is in the prepare-rebalance state, - * its rebalance callback will be kept in the metadata if the - * consumer has sent the join group request - */ -@nonthreadsafe -private[coordinator] class ConsumerMetadata(val consumerId: String, - val groupId: String, - var topics: Set[String], - val sessionTimeoutMs: Int) { - - var awaitingRebalanceCallback: (Set[TopicAndPartition], String, Int, Short) => Unit = null - var assignedTopicPartitions = Set.empty[TopicAndPartition] - var latestHeartbeat: Long = -1 - var isLeaving: Boolean = false -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index a33231a..2279924 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -17,11 +17,8 @@ package kafka.coordinator -import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} -import kafka.utils.{threadsafe, ZkUtils, Logging} -import kafka.utils.ZkUtils._ -import org.I0Itec.zkclient.{ZkClient, IZkDataListener} +import kafka.utils.threadsafe import java.util.concurrent.locks.ReentrantReadWriteLock @@ -32,9 +29,7 @@ import scala.collection.mutable * It delegates all group logic to the callers. */ @threadsafe -private[coordinator] class CoordinatorMetadata(brokerId: Int, - zkUtils: ZkUtils, - maybePrepareRebalance: ConsumerGroupMetadata => Unit) { +private[coordinator] class CoordinatorMetadata(brokerId: Int) { /** * NOTE: If a group lock and metadataLock are simultaneously needed, @@ -45,24 +40,11 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int, /** * These should be guarded by metadataLock */ - private val groups = new mutable.HashMap[String, ConsumerGroupMetadata] - private val groupsPerTopic = new mutable.HashMap[String, Set[String]] - private val topicPartitionCounts = new mutable.HashMap[String, Int] - private val topicPartitionChangeListeners = new mutable.HashMap[String, TopicPartitionChangeListener] + private val groups = new mutable.HashMap[String, GroupMetadata] def shutdown() { inWriteLock(metadataLock) { - topicPartitionChangeListeners.keys.foreach(deregisterTopicPartitionChangeListener) - topicPartitionChangeListeners.clear() groups.clear() - groupsPerTopic.clear() - topicPartitionCounts.clear() - } - } - - def partitionsPerTopic = { - inReadLock(metadataLock) { - topicPartitionCounts.toMap } } @@ -78,148 +60,22 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int, /** * Add a group or get the group associated with the given groupId if it already exists */ - def addGroup(groupId: String, partitionAssignmentStrategy: String) = { + def addGroup(groupId: String, protocolType: String) = { inWriteLock(metadataLock) { - groups.getOrElseUpdate(groupId, new ConsumerGroupMetadata(groupId, partitionAssignmentStrategy)) + groups.getOrElseUpdate(groupId, new GroupMetadata(groupId, protocolType)) } } /** * Remove all metadata associated with the group, including its topics * @param groupId the groupId of the group we are removing - * @param topicsForGroup topics that consumers in the group were subscribed to */ - def removeGroup(groupId: String, topicsForGroup: Set[String]) { + def removeGroup(groupId: String) { inWriteLock(metadataLock) { - topicsForGroup.foreach(topic => unbindGroupFromTopics(groupId, topicsForGroup)) + if (!groups.contains(groupId)) + throw new IllegalArgumentException("Cannot remove non-existing group") groups.remove(groupId) } } - /** - * Add the given group to the set of groups interested in - * topic partition changes for the given topics - */ - def bindGroupToTopics(groupId: String, topics: Set[String]) { - inWriteLock(metadataLock) { - require(groups.contains(groupId), "CoordinatorMetadata can only bind existing groups") - topics.foreach(topic => bindGroupToTopic(groupId, topic)) - } - } - - /** - * Remove the given group from the set of groups interested in - * topic partition changes for the given topics - */ - def unbindGroupFromTopics(groupId: String, topics: Set[String]) { - inWriteLock(metadataLock) { - require(groups.contains(groupId), "CoordinatorMetadata can only unbind existing groups") - topics.foreach(topic => unbindGroupFromTopic(groupId, topic)) - } - } - - /** - * Add the given group to the set of groups interested in the topicsToBind and - * remove the given group from the set of groups interested in the topicsToUnbind - */ - def bindAndUnbindGroupFromTopics(groupId: String, topicsToBind: Set[String], topicsToUnbind: Set[String]) { - inWriteLock(metadataLock) { - require(groups.contains(groupId), "CoordinatorMetadata can only update topic bindings for existing groups") - topicsToBind.foreach(topic => bindGroupToTopic(groupId, topic)) - topicsToUnbind.foreach(topic => unbindGroupFromTopic(groupId, topic)) - } - } - - private def isListeningToTopic(topic: String) = topicPartitionChangeListeners.contains(topic) - - private def bindGroupToTopic(groupId: String, topic: String) { - if (isListeningToTopic(topic)) { - val currentGroupsForTopic = groupsPerTopic(topic) - groupsPerTopic.put(topic, currentGroupsForTopic + groupId) - } - else { - groupsPerTopic.put(topic, Set(groupId)) - topicPartitionCounts.put(topic, getTopicPartitionCountFromZK(topic)) - registerTopicPartitionChangeListener(topic) - } - } - - private def unbindGroupFromTopic(groupId: String, topic: String) { - if (isListeningToTopic(topic)) { - val remainingGroupsForTopic = groupsPerTopic(topic) - groupId - if (remainingGroupsForTopic.isEmpty) { - // no other group cares about the topic, so erase all metadata associated with the topic - groupsPerTopic.remove(topic) - topicPartitionCounts.remove(topic) - deregisterTopicPartitionChangeListener(topic) - } else { - groupsPerTopic.put(topic, remainingGroupsForTopic) - } - } - } - - private def getTopicPartitionCountFromZK(topic: String) = { - val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic)) - topicData(topic).size - } - - private def registerTopicPartitionChangeListener(topic: String) { - val listener = new TopicPartitionChangeListener - topicPartitionChangeListeners.put(topic, listener) - zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), listener) - } - - private def deregisterTopicPartitionChangeListener(topic: String) { - val listener = topicPartitionChangeListeners(topic) - zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener) - topicPartitionChangeListeners.remove(topic) - } - - /** - * Zookeeper listener to handle topic partition changes - */ - class TopicPartitionChangeListener extends IZkDataListener with Logging { - this.logIdent = "[TopicPartitionChangeListener on Coordinator " + brokerId + "]: " - - override def handleDataChange(dataPath: String, data: Object) { - info("Handling data change for path: %s data: %s".format(dataPath, data)) - val topic = topicFromDataPath(dataPath) - val numPartitions = getTopicPartitionCountFromZK(topic) - - val groupsToRebalance = inWriteLock(metadataLock) { - /* - * This condition exists because a consumer can leave and modify CoordinatorMetadata state - * while ZkClient begins handling the data change but before we acquire the metadataLock. - */ - if (isListeningToTopic(topic)) { - topicPartitionCounts.put(topic, numPartitions) - groupsPerTopic(topic).map(groupId => groups(groupId)) - } - else Set.empty[ConsumerGroupMetadata] - } - groupsToRebalance.foreach(maybePrepareRebalance) - } - - override def handleDataDeleted(dataPath: String) { - info("Handling data delete for path: %s".format(dataPath)) - val topic = topicFromDataPath(dataPath) - val groupsToRebalance = inWriteLock(metadataLock) { - /* - * This condition exists because a consumer can leave and modify CoordinatorMetadata state - * while ZkClient begins handling the data delete but before we acquire the metadataLock. - */ - if (isListeningToTopic(topic)) { - topicPartitionCounts.put(topic, 0) - groupsPerTopic(topic).map(groupId => groups(groupId)) - } - else Set.empty[ConsumerGroupMetadata] - } - groupsToRebalance.foreach(maybePrepareRebalance) - } - - private def topicFromDataPath(dataPath: String) = { - val nodes = dataPath.split("/") - nodes.last - } - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala index 70a710c..8e250c3 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala @@ -23,13 +23,13 @@ import kafka.server.DelayedOperation * Delayed heartbeat operations that are added to the purgatory for session timeout checking. * Heartbeats are paused during rebalance. */ -private[coordinator] class DelayedHeartbeat(consumerCoordinator: ConsumerCoordinator, - group: ConsumerGroupMetadata, - consumer: ConsumerMetadata, +private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator, + group: GroupMetadata, + member: MemberMetadata, heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) { - override def tryComplete(): Boolean = consumerCoordinator.tryCompleteHeartbeat(group, consumer, heartbeatDeadline, forceComplete) - override def onExpiration() = consumerCoordinator.onExpirationHeartbeat(group, consumer, heartbeatDeadline) - override def onComplete() = consumerCoordinator.onCompleteHeartbeat() + override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete) + override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) + override def onComplete() = coordinator.onCompleteHeartbeat() } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedJoin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala new file mode 100644 index 0000000..ae96e15 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.server.DelayedOperation + +/** + * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance + * + * Whenever a join-group request is received, check if all known group members have requested + * to re-join the group; if yes, complete this operation to proceed rebalance. + * + * When the operation has expired, any known members that have not requested to re-join + * the group are marked as failed, and complete this operation to proceed rebalance with + * the rest of the group. + */ +private[coordinator] class DelayedJoin(coordinator: GroupCoordinator, + group: GroupMetadata, + sessionTimeout: Long) + extends DelayedOperation(sessionTimeout) { + + override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete) + override def onExpiration() = coordinator.onExpireJoin() + override def onComplete() = coordinator.onCompleteJoin(group) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala deleted file mode 100644 index 8247d33..0000000 --- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.server.DelayedOperation - -/** - * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance - * - * Whenever a join-group request is received, check if all known consumers have requested - * to re-join the group; if yes, complete this operation to proceed rebalance. - * - * When the operation has expired, any known consumers that have not requested to re-join - * the group are marked as failed, and complete this operation to proceed rebalance with - * the rest of the group. - */ -private[coordinator] class DelayedRebalance(consumerCoordinator: ConsumerCoordinator, - group: ConsumerGroupMetadata, - sessionTimeout: Long) - extends DelayedOperation(sessionTimeout) { - - override def tryComplete(): Boolean = consumerCoordinator.tryCompleteRebalance(group, forceComplete) - override def onExpiration() = consumerCoordinator.onExpirationRebalance() - override def onComplete() = consumerCoordinator.onCompleteRebalance(group) -}
