http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/other/kafka/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 549a96b..a77979a 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -135,8 +135,8 @@ object TestOffsetManager { val id = random.nextInt().abs % numGroups val group = "group-" + id try { - metadataChannel.send(ConsumerMetadataRequest(group)) - val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1) + metadataChannel.send(GroupMetadataRequest(group)) + val coordinatorId = GroupMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1) val channel = if (channels.contains(coordinatorId)) channels(coordinatorId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index e2a75e2..1266598 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -35,18 +35,18 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { private val HostsString = Hosts.mkString(AclCommand.Delimiter.toString) private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2")) - private val ConsumerGroupResources = Set(new Resource(ConsumerGroup, "testGroup-1"), new Resource(ConsumerGroup, "testGroup-2")) + private val GroupResources = Set(new Resource(Group, "testGroup-1"), new Resource(Group, "testGroup-2")) private val ResourceToCommand = Map[Set[Resource], Array[String]]( TopicResources -> Array("--topic", "test-1,test-2"), Set(Resource.ClusterResource) -> Array("--cluster"), - ConsumerGroupResources -> Array("--consumer-group", "testGroup-1,testGroup-2") + GroupResources -> Array("--group", "testGroup-1,testGroup-2") ) private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])]( TopicResources -> (Set(Read, Write, Describe), Array("--operations", "Read,Write,Describe")), Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operations", "Create,ClusterAction")), - ConsumerGroupResources -> (Set(Read).toSet[Operation], Array("--operations", "Read")) + GroupResources -> (Set(Read).toSet[Operation], Array("--operations", "Read")) ) private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]]( @@ -56,7 +56,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]]( TopicResources -> AclCommand.getAcls(Users, Allow, Set(Read, Describe), Hosts), - ConsumerGroupResources -> AclCommand.getAcls(Users, Allow, Set(Read), Hosts) + GroupResources -> AclCommand.getAcls(Users, Allow, Set(Read), Hosts) ) private val CmdToResourcesToAcl = Map[Array[String], Map[Set[Resource], Set[Acl]]]( http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index cab4813..820a825 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -24,7 +24,7 @@ import kafka.zk.ZooKeeperTestHarness import kafka.server.ConfigType import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils._ -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.GroupCoordinator class TopicCommandTest extends ZooKeeperTestHarness with Logging { @@ -85,12 +85,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging { // create the offset topic val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", - "--topic", ConsumerCoordinator.OffsetsTopicName)) + "--topic", GroupCoordinator.OffsetsTopicName)) TopicCommand.createTopic(zkUtils, createOffsetTopicOpts) // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't - val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName)) - val deleteOffsetTopicPath = getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName) + val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", GroupCoordinator.OffsetsTopicName)) + val deleteOffsetTopicPath = getDeleteTopicPath(GroupCoordinator.OffsetsTopicName) assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath)) intercept[AdminOperationException] { TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts) http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index b7e7967..09e9ce3 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -231,12 +231,12 @@ object SerializationTestUtils { )) } - def createConsumerMetadataRequest: ConsumerMetadataRequest = { - ConsumerMetadataRequest("group 1", clientId = "client 1") + def createConsumerMetadataRequest: GroupMetadataRequest = { + GroupMetadataRequest("group 1", clientId = "client 1") } - def createConsumerMetadataResponse: ConsumerMetadataResponse = { - ConsumerMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0) + def createConsumerMetadataResponse: GroupMetadataResponse = { + GroupMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0) } def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = { @@ -276,7 +276,7 @@ class RequestResponseSerializationTest extends JUnitSuite { private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse - private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) + private val consumerMetadataResponseNoCoordinator = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0) private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1) private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 2e18e92..24fba45 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -22,7 +22,7 @@ import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.Test import kafka.server.OffsetManager -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.GroupCoordinator class TopicFilterTest extends JUnitSuite { @@ -38,8 +38,8 @@ class TopicFilterTest extends JUnitSuite { val topicFilter2 = new Whitelist(".+") assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true)) - assertFalse(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter2.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = true)) + assertTrue(topicFilter2.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = false)) val topicFilter3 = new Whitelist("white_listed-topic.+") assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true)) @@ -58,8 +58,8 @@ class TopicFilterTest extends JUnitSuite { assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true)) assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false)) - assertFalse(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter1.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = true)) + assertTrue(topicFilter1.isTopicAllowed(GroupCoordinator.OffsetsTopicName, excludeInternalTopics = false)) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala deleted file mode 100644 index c108955..0000000 --- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala +++ /dev/null @@ -1,447 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - - -import java.util.concurrent.TimeUnit - -import org.junit.Assert._ -import kafka.common.{OffsetAndMetadata, TopicAndPartition} -import kafka.server.{OffsetManager, KafkaConfig} -import kafka.utils.TestUtils -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest} -import org.easymock.{IAnswer, EasyMock} -import org.junit.{After, Before, Test} -import org.scalatest.junit.JUnitSuite - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future, Promise} - -/** - * Test ConsumerCoordinator responses - */ -class ConsumerCoordinatorResponseTest extends JUnitSuite { - type JoinGroupCallbackParams = (Set[TopicAndPartition], String, Int, Short) - type JoinGroupCallback = (Set[TopicAndPartition], String, Int, Short) => Unit - type HeartbeatCallbackParams = Short - type HeartbeatCallback = Short => Unit - type CommitOffsetCallbackParams = Map[TopicAndPartition, Short] - type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit - type LeaveGroupCallbackParams = Short - type LeaveGroupCallback = Short => Unit - - val ConsumerMinSessionTimeout = 10 - val ConsumerMaxSessionTimeout = 200 - val DefaultSessionTimeout = 100 - var consumerCoordinator: ConsumerCoordinator = null - var offsetManager : OffsetManager = null - - @Before - def setUp() { - val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") - props.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString) - props.setProperty(KafkaConfig.ConsumerMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString) - offsetManager = EasyMock.createStrictMock(classOf[OffsetManager]) - consumerCoordinator = ConsumerCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager) - consumerCoordinator.startup() - } - - @After - def tearDown() { - EasyMock.reset(offsetManager) - consumerCoordinator.shutdown() - } - - @Test - def testJoinGroupWrongCoordinator() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = false) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, joinGroupErrorCode) - } - - @Test - def testJoinGroupUnknownPartitionAssignmentStrategy() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "foo" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code, joinGroupErrorCode) - } - - @Test - def testJoinGroupSessionTimeoutTooSmall() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMinSessionTimeout - 1, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode) - } - - @Test - def testJoinGroupSessionTimeoutTooLarge() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMaxSessionTimeout + 1, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode) - } - - @Test - def testJoinGroupUnknownConsumerNewGroup() { - val groupId = "groupId" - val consumerId = "consumerId" - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, joinGroupErrorCode) - } - - @Test - def testValidJoinGroup() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - } - - @Test - def testJoinGroupInconsistentPartitionAssignmentStrategy() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - val otherPartitionAssignmentStrategy = "roundrobin" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - EasyMock.reset(offsetManager) - val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, otherPartitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val otherJoinGroupErrorCode = otherJoinGroupResult._4 - assertEquals(Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code, otherJoinGroupErrorCode) - } - - @Test - def testJoinGroupUnknownConsumerExistingGroup() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val otherConsumerId = "consumerId" - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - EasyMock.reset(offsetManager) - val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val otherJoinGroupErrorCode = otherJoinGroupResult._4 - assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, otherJoinGroupErrorCode) - } - - @Test - def testHeartbeatWrongCoordinator() { - val groupId = "groupId" - val consumerId = "consumerId" - - val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false) - assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, heartbeatResult) - } - - @Test - def testHeartbeatUnknownGroup() { - val groupId = "groupId" - val consumerId = "consumerId" - - val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true) - assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult) - } - - @Test - def testHeartbeatUnknownConsumerExistingGroup() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val otherConsumerId = "consumerId" - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - EasyMock.reset(offsetManager) - val heartbeatResult = heartbeat(groupId, otherConsumerId, 1, isCoordinatorForGroup = true) - assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, heartbeatResult) - } - - @Test - def testHeartbeatIllegalGeneration() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val assignedConsumerId = joinGroupResult._2 - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - EasyMock.reset(offsetManager) - val heartbeatResult = heartbeat(groupId, assignedConsumerId, 2, isCoordinatorForGroup = true) - assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult) - } - - @Test - def testValidHeartbeat() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val assignedConsumerId = joinGroupResult._2 - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - EasyMock.reset(offsetManager) - val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup = true) - assertEquals(Errors.NONE.code, heartbeatResult) - } - - @Test - def testCommitOffsetFromUnknownGroup() { - val groupId = "groupId" - val consumerId = "consumer" - val generationId = 1 - val tp = new TopicAndPartition("topic", 0) - val offset = OffsetAndMetadata(0) - - val commitOffsetResult = commitOffsets(groupId, consumerId, generationId, Map(tp -> offset), true) - assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp)) - } - - @Test - def testCommitOffsetWithDefaultGeneration() { - val groupId = "groupId" - val tp = new TopicAndPartition("topic", 0) - val offset = OffsetAndMetadata(0) - - val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_CONSUMER_ID, - OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true) - assertEquals(Errors.NONE.code, commitOffsetResult(tp)) - } - - @Test - def testHeartbeatDuringRebalanceCausesRebalanceInProgress() { - val groupId = "groupId" - val partitionAssignmentStrategy = "range" - - // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts) - val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy, - DefaultSessionTimeout, isCoordinatorForGroup = true) - val assignedConsumerId = joinGroupResult._2 - val initialGenerationId = joinGroupResult._3 - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - // Then join with a new consumer to trigger a rebalance - EasyMock.reset(offsetManager) - sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy, - DefaultSessionTimeout, isCoordinatorForGroup = true) - - // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress - EasyMock.reset(offsetManager) - val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true) - assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult) - } - - @Test - def testGenerationIdIncrementsOnRebalance() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val initialGenerationId = joinGroupResult._3 - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(1, initialGenerationId) - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - EasyMock.reset(offsetManager) - val otherJoinGroupResult = joinGroup(groupId, otherConsumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val nextGenerationId = otherJoinGroupResult._3 - val otherJoinGroupErrorCode = otherJoinGroupResult._4 - assertEquals(2, nextGenerationId) - assertEquals(Errors.NONE.code, otherJoinGroupErrorCode) - } - - @Test - def testLeaveGroupWrongCoordinator() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - - val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = false) - assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, leaveGroupResult) - } - - @Test - def testLeaveGroupUnknownGroup() { - val groupId = "groupId" - val consumerId = "consumerId" - - val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = true) - assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult) - } - - @Test - def testLeaveGroupUnknownConsumerExistingGroup() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val otherConsumerId = "consumerId" - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - EasyMock.reset(offsetManager) - val leaveGroupResult = leaveGroup(groupId, otherConsumerId, isCoordinatorForGroup = true) - assertEquals(Errors.UNKNOWN_CONSUMER_ID.code, leaveGroupResult) - } - - @Test - def testValidLeaveGroup() { - val groupId = "groupId" - val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID - val partitionAssignmentStrategy = "range" - - val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true) - val assignedConsumerId = joinGroupResult._2 - val joinGroupErrorCode = joinGroupResult._4 - assertEquals(Errors.NONE.code, joinGroupErrorCode) - - EasyMock.reset(offsetManager) - val leaveGroupResult = leaveGroup(groupId, assignedConsumerId, isCoordinatorForGroup = true) - assertEquals(Errors.NONE.code, leaveGroupResult) - } - - private def setupJoinGroupCallback: (Future[JoinGroupCallbackParams], JoinGroupCallback) = { - val responsePromise = Promise[JoinGroupCallbackParams] - val responseFuture = responsePromise.future - val responseCallback: JoinGroupCallback = (partitions, consumerId, generationId, errorCode) => - responsePromise.success((partitions, consumerId, generationId, errorCode)) - (responseFuture, responseCallback) - } - - private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = { - val responsePromise = Promise[HeartbeatCallbackParams] - val responseFuture = responsePromise.future - val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode) - (responseFuture, responseCallback) - } - - private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = { - val responsePromise = Promise[CommitOffsetCallbackParams] - val responseFuture = responsePromise.future - val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets) - (responseFuture, responseCallback) - } - - private def setupLeaveGroupCallback: (Future[LeaveGroupCallbackParams], LeaveGroupCallback) = { - val responsePromise = Promise[LeaveGroupCallbackParams] - val responseFuture = responsePromise.future - val responseCallback: LeaveGroupCallback = errorCode => responsePromise.success(errorCode) - (responseFuture, responseCallback) - } - - private def sendJoinGroup(groupId: String, - consumerId: String, - partitionAssignmentStrategy: String, - sessionTimeout: Int, - isCoordinatorForGroup: Boolean): Future[JoinGroupCallbackParams] = { - val (responseFuture, responseCallback) = setupJoinGroupCallback - EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) - EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) - EasyMock.replay(offsetManager) - consumerCoordinator.handleJoinGroup(groupId, consumerId, Set.empty, sessionTimeout, partitionAssignmentStrategy, responseCallback) - responseFuture - } - - private def joinGroup(groupId: String, - consumerId: String, - partitionAssignmentStrategy: String, - sessionTimeout: Int, - isCoordinatorForGroup: Boolean): JoinGroupCallbackParams = { - val responseFuture = sendJoinGroup(groupId, consumerId, partitionAssignmentStrategy, sessionTimeout, isCoordinatorForGroup) - // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay - Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS)) - } - - private def heartbeat(groupId: String, - consumerId: String, - generationId: Int, - isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = { - val (responseFuture, responseCallback) = setupHeartbeatCallback - EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) - EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) - EasyMock.replay(offsetManager) - consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback) - Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) - } - - private def commitOffsets(groupId: String, - consumerId: String, - generationId: Int, - offsets: Map[TopicAndPartition, OffsetAndMetadata], - isCoordinatorForGroup: Boolean): CommitOffsetCallbackParams = { - val (responseFuture, responseCallback) = setupCommitOffsetsCallback - EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) - EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) - val storeOffsetAnswer = new IAnswer[Unit] { - override def answer = responseCallback.apply(offsets.mapValues(_ => Errors.NONE.code)) - } - EasyMock.expect(offsetManager.storeOffsets(groupId, consumerId, generationId, offsets, responseCallback)) - .andAnswer(storeOffsetAnswer) - EasyMock.replay(offsetManager) - consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback) - Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) - Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) - } - - private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = { - val (responseFuture, responseCallback) = setupHeartbeatCallback - EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) - EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) - EasyMock.replay(offsetManager) - consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback) - Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala deleted file mode 100644 index 5d812c2..0000000 --- a/core/src/test/scala/unit/kafka/coordinator/ConsumerGroupMetadataTest.scala +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import org.junit.Assert._ -import org.junit.{Before, Test} -import org.scalatest.junit.JUnitSuite - -/** - * Test group state transitions - */ -class ConsumerGroupMetadataTest extends JUnitSuite { - var group: ConsumerGroupMetadata = null - - @Before - def setUp() { - group = new ConsumerGroupMetadata("test", "range") - } - - @Test - def testCanRebalanceWhenStable() { - assertTrue(group.canRebalance) - } - - @Test - def testCannotRebalanceWhenPreparingRebalance() { - group.transitionTo(PreparingRebalance) - assertFalse(group.canRebalance) - } - - @Test - def testCannotRebalanceWhenRebalancing() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - assertFalse(group.canRebalance) - } - - @Test - def testCannotRebalanceWhenDead() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - assertFalse(group.canRebalance) - } - - @Test - def testStableToPreparingRebalanceTransition() { - group.transitionTo(PreparingRebalance) - assertState(group, PreparingRebalance) - } - - @Test - def testPreparingRebalanceToRebalancingTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - assertState(group, Rebalancing) - } - - @Test - def testPreparingRebalanceToDeadTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - assertState(group, Dead) - } - - @Test - def testRebalancingToStableTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - group.transitionTo(Stable) - assertState(group, Stable) - } - - @Test(expected = classOf[IllegalStateException]) - def testStableToStableIllegalTransition() { - group.transitionTo(Stable) - } - - @Test(expected = classOf[IllegalStateException]) - def testStableToRebalancingIllegalTransition() { - group.transitionTo(Rebalancing) - } - - @Test(expected = classOf[IllegalStateException]) - def testStableToDeadIllegalTransition() { - group.transitionTo(Dead) - } - - @Test(expected = classOf[IllegalStateException]) - def testPreparingRebalanceToPreparingRebalanceIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(PreparingRebalance) - } - - @Test(expected = classOf[IllegalStateException]) - def testPreparingRebalanceToStableIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Stable) - } - - @Test(expected = classOf[IllegalStateException]) - def testRebalancingToRebalancingIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - group.transitionTo(Rebalancing) - } - - @Test(expected = classOf[IllegalStateException]) - def testRebalancingToPreparingRebalanceTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - group.transitionTo(PreparingRebalance) - } - - @Test(expected = classOf[IllegalStateException]) - def testRebalancingToDeadIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Rebalancing) - group.transitionTo(Dead) - } - - @Test(expected = classOf[IllegalStateException]) - def testDeadToDeadIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - group.transitionTo(Dead) - } - - @Test(expected = classOf[IllegalStateException]) - def testDeadToStableIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - group.transitionTo(Stable) - } - - @Test(expected = classOf[IllegalStateException]) - def testDeadToPreparingRebalanceIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - group.transitionTo(PreparingRebalance) - } - - @Test(expected = classOf[IllegalStateException]) - def testDeadToRebalancingIllegalTransition() { - group.transitionTo(PreparingRebalance) - group.transitionTo(Dead) - group.transitionTo(Rebalancing) - } - - private def assertState(group: ConsumerGroupMetadata, targetState: GroupState) { - val states: Set[GroupState] = Set(Stable, PreparingRebalance, Rebalancing, Dead) - val otherStates = states - targetState - otherStates.foreach { otherState => - assertFalse(group.is(otherState)) - } - assertTrue(group.is(targetState)) - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala index 3bc37e5..49a237b 100644 --- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala @@ -18,13 +18,9 @@ package kafka.coordinator import kafka.server.KafkaConfig -import kafka.utils.{TestUtils, ZkUtils} -import kafka.utils.ZkUtils._ +import kafka.utils.TestUtils import org.junit.Assert._ -import org.I0Itec.zkclient.{IZkDataListener, ZkClient} -import org.apache.zookeeper.data.Stat -import org.easymock.EasyMock import org.junit.{Before, Test} import org.scalatest.junit.JUnitSuite @@ -34,15 +30,12 @@ import org.scalatest.junit.JUnitSuite class CoordinatorMetadataTest extends JUnitSuite { val DefaultNumPartitions = 8 val DefaultNumReplicas = 2 - var zkUtils: ZkUtils = null var coordinatorMetadata: CoordinatorMetadata = null @Before def setUp() { val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") - val zkClient = EasyMock.createStrictMock(classOf[ZkClient]) - zkUtils = ZkUtils(zkClient, false) - coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkUtils, null) + coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId) } @Test @@ -53,7 +46,8 @@ class CoordinatorMetadataTest extends JUnitSuite { @Test def testGetGroup() { val groupId = "group" - val expected = coordinatorMetadata.addGroup(groupId, "range") + val protocolType = "consumer" + val expected = coordinatorMetadata.addGroup(groupId, protocolType) val actual = coordinatorMetadata.getGroup(groupId) assertEquals(expected, actual) } @@ -61,155 +55,17 @@ class CoordinatorMetadataTest extends JUnitSuite { @Test def testAddGroupReturnsPreexistingGroupIfItAlreadyExists() { val groupId = "group" - val group1 = coordinatorMetadata.addGroup(groupId, "range") - val group2 = coordinatorMetadata.addGroup(groupId, "range") + val protocolType = "consumer" + val group1 = coordinatorMetadata.addGroup(groupId, protocolType) + val group2 = coordinatorMetadata.addGroup(groupId, protocolType) assertEquals(group1, group2) } @Test(expected = classOf[IllegalArgumentException]) - def testBindNonexistentGroupToTopics() { - val groupId = "group" - val topics = Set("a") - coordinatorMetadata.bindGroupToTopics(groupId, topics) - } - - @Test - def testBindGroupToTopicsNotListenedOn() { - val groupId = "group" - val topics = Set("a") - coordinatorMetadata.addGroup(groupId, "range") - - expectZkClientSubscribeDataChanges(zkUtils, topics) - EasyMock.replay(zkUtils.zkClient) - coordinatorMetadata.bindGroupToTopics(groupId, topics) - assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic) - } - - @Test - def testBindGroupToTopicsAlreadyListenedOn() { - val group1 = "group1" - val group2 = "group2" - val topics = Set("a") - coordinatorMetadata.addGroup(group1, "range") - coordinatorMetadata.addGroup(group2, "range") - - expectZkClientSubscribeDataChanges(zkUtils, topics) - EasyMock.replay(zkUtils.zkClient) - coordinatorMetadata.bindGroupToTopics(group1, topics) - coordinatorMetadata.bindGroupToTopics(group2, topics) - assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic) - } - - @Test(expected = classOf[IllegalArgumentException]) - def testUnbindNonexistentGroupFromTopics() { - val groupId = "group" - val topics = Set("a") - coordinatorMetadata.unbindGroupFromTopics(groupId, topics) - } - - @Test - def testUnbindGroupFromTopicsNotListenedOn() { - val groupId = "group" - val topics = Set("a") - coordinatorMetadata.addGroup(groupId, "range") - - expectZkClientSubscribeDataChanges(zkUtils, topics) - EasyMock.replay(zkUtils.zkClient) - coordinatorMetadata.bindGroupToTopics(groupId, topics) - coordinatorMetadata.unbindGroupFromTopics(groupId, Set("b")) - assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic) - } - - @Test - def testUnbindGroupFromTopicsListenedOnByOtherGroups() { - val group1 = "group1" - val group2 = "group2" - val topics = Set("a") - coordinatorMetadata.addGroup(group1, "range") - coordinatorMetadata.addGroup(group2, "range") - - expectZkClientSubscribeDataChanges(zkUtils, topics) - EasyMock.replay(zkUtils.zkClient) - coordinatorMetadata.bindGroupToTopics(group1, topics) - coordinatorMetadata.bindGroupToTopics(group2, topics) - coordinatorMetadata.unbindGroupFromTopics(group1, topics) - assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic) - } - - @Test - def testUnbindGroupFromTopicsListenedOnByNoOtherGroup() { - val groupId = "group" - val topics = Set("a") - coordinatorMetadata.addGroup(groupId, "range") - - expectZkClientSubscribeDataChanges(zkUtils, topics) - expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics) - EasyMock.replay(zkUtils.zkClient) - coordinatorMetadata.bindGroupToTopics(groupId, topics) - coordinatorMetadata.unbindGroupFromTopics(groupId, topics) - assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic) - } - - @Test(expected = classOf[IllegalArgumentException]) def testRemoveNonexistentGroup() { val groupId = "group" val topics = Set("a") - coordinatorMetadata.removeGroup(groupId, topics) - } - - @Test - def testRemoveGroupWithOtherGroupsBoundToItsTopics() { - val group1 = "group1" - val group2 = "group2" - val topics = Set("a") - coordinatorMetadata.addGroup(group1, "range") - coordinatorMetadata.addGroup(group2, "range") - - expectZkClientSubscribeDataChanges(zkUtils, topics) - EasyMock.replay(zkUtils.zkClient) - coordinatorMetadata.bindGroupToTopics(group1, topics) - coordinatorMetadata.bindGroupToTopics(group2, topics) - coordinatorMetadata.removeGroup(group1, topics) - assertNull(coordinatorMetadata.getGroup(group1)) - assertNotNull(coordinatorMetadata.getGroup(group2)) - assertEquals(Map("a" -> DefaultNumPartitions), coordinatorMetadata.partitionsPerTopic) - } - - @Test - def testRemoveGroupWithNoOtherGroupsBoundToItsTopics() { - val groupId = "group" - val topics = Set("a") - coordinatorMetadata.addGroup(groupId, "range") - - expectZkClientSubscribeDataChanges(zkUtils, topics) - expectZkClientUnsubscribeDataChanges(zkUtils.zkClient, topics) - EasyMock.replay(zkUtils.zkClient) - coordinatorMetadata.bindGroupToTopics(groupId, topics) - coordinatorMetadata.removeGroup(groupId, topics) - assertNull(coordinatorMetadata.getGroup(groupId)) - assertEquals(Map.empty[String, Int], coordinatorMetadata.partitionsPerTopic) + coordinatorMetadata.removeGroup(groupId) } - private def expectZkClientSubscribeDataChanges(zkUtils: ZkUtils, topics: Set[String]) { - topics.foreach(topic => expectZkClientSubscribeDataChange(zkUtils.zkClient, topic)) - } - - private def expectZkClientUnsubscribeDataChanges(zkClient: ZkClient, topics: Set[String]) { - topics.foreach(topic => expectZkClientUnsubscribeDataChange(zkClient, topic)) - } - - private def expectZkClientSubscribeDataChange(zkClient: ZkClient, topic: String) { - val replicaAssignment = - (0 until DefaultNumPartitions) - .map(partition => partition.toString -> (0 until DefaultNumReplicas).toSeq).toMap - val topicPath = getTopicPath(topic) - EasyMock.expect(zkClient.readData(topicPath, new Stat())) - .andReturn(zkUtils.replicaAssignmentZkData(replicaAssignment)) - zkClient.subscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener])) - } - - private def expectZkClientUnsubscribeDataChange(zkClient: ZkClient, topic: String) { - val topicPath = getTopicPath(topic) - zkClient.unsubscribeDataChanges(EasyMock.eq(topicPath), EasyMock.isA(classOf[IZkDataListener])) - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala new file mode 100644 index 0000000..cdd78ef --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -0,0 +1,907 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + + +import java.util.concurrent.TimeUnit + +import org.junit.Assert._ +import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.server.{OffsetManager, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest} +import org.easymock.{IAnswer, EasyMock} +import org.junit.{After, Before, Test} +import org.scalatest.junit.JUnitSuite + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future, Promise} + +/** + * Test GroupCoordinator responses + */ +class GroupCoordinatorResponseTest extends JUnitSuite { + type JoinGroupCallback = JoinGroupResult => Unit + type SyncGroupCallbackParams = (Array[Byte], Short) + type SyncGroupCallback = (Array[Byte], Short) => Unit + type HeartbeatCallbackParams = Short + type HeartbeatCallback = Short => Unit + type CommitOffsetCallbackParams = Map[TopicAndPartition, Short] + type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit + type LeaveGroupCallbackParams = Short + type LeaveGroupCallback = Short => Unit + + val ConsumerMinSessionTimeout = 10 + val ConsumerMaxSessionTimeout = 1000 + val DefaultSessionTimeout = 500 + var consumerCoordinator: GroupCoordinator = null + var offsetManager : OffsetManager = null + + @Before + def setUp() { + val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString) + props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString) + offsetManager = EasyMock.createStrictMock(classOf[OffsetManager]) + consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager) + consumerCoordinator.startup() + } + + @After + def tearDown() { + EasyMock.reset(offsetManager) + consumerCoordinator.shutdown() + } + + @Test + def testJoinGroupWrongCoordinator() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, + protocols, isCoordinatorForGroup = false) + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupSessionTimeoutTooSmall() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, ConsumerMinSessionTimeout - 1, protocolType, protocols, + isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupSessionTimeoutTooLarge() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, ConsumerMaxSessionTimeout + 1, protocolType, protocols, + isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.INVALID_SESSION_TIMEOUT.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupUnknownConsumerNewGroup() { + val groupId = "groupId" + val memberId = "memberId" + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode) + } + + @Test + def testValidJoinGroup() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, + protocols, isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + } + + @Test + def testJoinGroupInconsistentProtocolType() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + + val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, "consumer", List(("range", metadata)), + isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, joinGroupResult.errorCode) + + EasyMock.reset(offsetManager) + val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat", + List(("range", metadata)), isCoordinatorForGroup = true) + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode) + } + + @Test + def testJoinGroupInconsistentGroupProtocol() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val protocolType = "consumer" + val metadata = Array[Byte]() + + val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, List(("range", metadata)), + isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, joinGroupResult.errorCode) + + EasyMock.reset(offsetManager) + val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, + List(("roundrobin", metadata)), isCoordinatorForGroup = true) + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode) + } + + @Test + def testJoinGroupUnknownConsumerExistingGroup() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val otherMemberId = "memberId" + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, joinGroupResult.errorCode) + + EasyMock.reset(offsetManager) + val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, otherJoinGroupResult.errorCode) + } + + @Test + def testHeartbeatWrongCoordinator() { + val groupId = "groupId" + val consumerId = "memberId" + + val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = false) + assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, heartbeatResult) + } + + @Test + def testHeartbeatUnknownGroup() { + val groupId = "groupId" + val consumerId = "memberId" + + val heartbeatResult = heartbeat(groupId, consumerId, -1, isCoordinatorForGroup = true) + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult) + } + + @Test + def testHeartbeatUnknownConsumerExistingGroup() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val otherMemberId = "memberId" + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, joinGroupResult.memberId, Map.empty, true) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, otherMemberId, 1, isCoordinatorForGroup = true) + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult) + } + + @Test + def testHeartbeatRebalanceInProgress() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val assignedMemberId = joinGroupResult.memberId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, assignedMemberId, 2, isCoordinatorForGroup = true) + assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult) + } + + @Test + def testHeartbeatIllegalGeneration() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val assignedMemberId = joinGroupResult.memberId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map.empty, true) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, assignedMemberId, 2, isCoordinatorForGroup = true) + assertEquals(Errors.ILLEGAL_GENERATION.code, heartbeatResult) + } + + @Test + def testValidHeartbeat() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map.empty, true) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1, isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, heartbeatResult) + } + + @Test + def testSyncGroupNotCoordinator() { + val groupId = "groupId" + val memberId = "member" + val generation = 1 + + val syncGroupResult = syncGroupFollower(groupId, generation, memberId, false) + assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, syncGroupResult._2) + } + + @Test + def testSyncGroupFromUnknownGroup() { + val groupId = "groupId" + val memberId = "member" + val generation = 1 + + val syncGroupResult = syncGroupFollower(groupId, generation, memberId, true) + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, syncGroupResult._2) + } + + @Test + def testSyncGroupFromUnknownMember() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + assertEquals(Errors.NONE.code, joinGroupResult.errorCode) + + EasyMock.reset(offsetManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map.empty, true) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + EasyMock.reset(offsetManager) + val unknownMemberId = "blah" + val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId, true) + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, unknownMemberSyncResult._2) + } + + @Test + def testSyncGroupFromIllegalGeneration() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + assertEquals(Errors.NONE.code, joinGroupResult.errorCode) + + EasyMock.reset(offsetManager) + // send the sync group with an invalid generation + val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map.empty, true) + assertEquals(Errors.ILLEGAL_GENERATION.code, syncGroupResult._2) + } + + @Test + def testJoinGroupFromUnchangedFollowerDoesNotRebalance() { + val groupId = "groupId" + val protocolType = "consumer" + val protocols = List(("range", Array[Byte]())) + + // to get a group of two members: + // 1. join and sync with a single member (because we can't immediately join with two members) + // 2. join and sync with the first member and a new member + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE.code, firstJoinResult.errorCode) + + EasyMock.reset(offsetManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true) + assertEquals(Errors.NONE.code, firstSyncResult._2) + + EasyMock.reset(offsetManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + + EasyMock.reset(offsetManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true); + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE.code, joinResult.errorCode) + assertEquals(Errors.NONE.code, otherJoinResult.errorCode) + assertTrue(joinResult.generationId == otherJoinResult.generationId) + + assertEquals(firstMemberId, joinResult.leaderId) + assertEquals(firstMemberId, otherJoinResult.leaderId) + + val nextGenerationId = joinResult.generationId + + // this shouldn't cause a rebalance since protocol information hasn't changed + EasyMock.reset(offsetManager) + val followerJoinResult = joinGroup(groupId, otherJoinResult.memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + + assertEquals(Errors.NONE.code, followerJoinResult.errorCode) + assertEquals(nextGenerationId, followerJoinResult.generationId) + } + + @Test + def testJoinGroupFromUnchangedLeaderShouldRebalance() { + val groupId = "groupId" + val protocolType = "consumer" + val protocols = List(("range", Array[Byte]())) + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE.code, firstJoinResult.errorCode) + + EasyMock.reset(offsetManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true) + assertEquals(Errors.NONE.code, firstSyncResult._2) + + // join groups from the leader should force the group to rebalance, which allows the + // leader to push new assignments when local metadata changes + + EasyMock.reset(offsetManager) + val secondJoinResult = joinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + + assertEquals(Errors.NONE.code, secondJoinResult.errorCode) + assertNotEquals(firstGenerationId, secondJoinResult.generationId) + } + + @Test + def testLeaderFailureInSyncGroup() { + val groupId = "groupId" + val protocolType = "consumer" + val protocols = List(("range", Array[Byte]())) + + // to get a group of two members: + // 1. join and sync with a single member (because we can't immediately join with two members) + // 2. join and sync with the first member and a new member + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE.code, firstJoinResult.errorCode) + + EasyMock.reset(offsetManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true) + assertEquals(Errors.NONE.code, firstSyncResult._2) + + EasyMock.reset(offsetManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + + EasyMock.reset(offsetManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true); + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE.code, joinResult.errorCode) + assertEquals(Errors.NONE.code, otherJoinResult.errorCode) + assertTrue(joinResult.generationId == otherJoinResult.generationId) + + assertEquals(firstMemberId, joinResult.leaderId) + assertEquals(firstMemberId, otherJoinResult.leaderId) + + val nextGenerationId = joinResult.generationId + + // with no leader SyncGroup, the follower's request should failure with an error indicating + // that it should rejoin + EasyMock.reset(offsetManager) + val followerSyncFuture= sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId, + isCoordinatorForGroup = true) + val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100) + assertEquals(Errors.REBALANCE_IN_PROGRESS.code, followerSyncResult._2) + } + + @Test + def testSyncGroupFollowerAfterLeader() { + val groupId = "groupId" + val protocolType = "consumer" + val protocols = List(("range", Array[Byte]())) + + // to get a group of two members: + // 1. join and sync with a single member (because we can't immediately join with two members) + // 2. join and sync with the first member and a new member + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE.code, firstJoinResult.errorCode) + + EasyMock.reset(offsetManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true) + assertEquals(Errors.NONE.code, firstSyncResult._2) + + EasyMock.reset(offsetManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + + EasyMock.reset(offsetManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true); + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE.code, joinResult.errorCode) + assertEquals(Errors.NONE.code, otherJoinResult.errorCode) + assertTrue(joinResult.generationId == otherJoinResult.generationId) + + assertEquals(firstMemberId, joinResult.leaderId) + assertEquals(firstMemberId, otherJoinResult.leaderId) + + val nextGenerationId = joinResult.generationId + val leaderId = firstMemberId + val leaderAssignment = Array[Byte](0) + val followerId = otherJoinResult.memberId + val followerAssignment = Array[Byte](1) + + EasyMock.reset(offsetManager) + val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId, + Map(leaderId -> leaderAssignment, followerId -> followerAssignment), isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, leaderSyncResult._2) + assertEquals(leaderAssignment, leaderSyncResult._1) + + EasyMock.reset(offsetManager) + val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId, + isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, followerSyncResult._2) + assertEquals(followerAssignment, followerSyncResult._1) + } + + @Test + def testSyncGroupLeaderAfterFollower() { + val groupId = "groupId" + val protocolType = "consumer" + val protocols = List(("range", Array[Byte]())) + + // to get a group of two members: + // 1. join and sync with a single member (because we can't immediately join with two members) + // 2. join and sync with the first member and a new member + + val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + val firstMemberId = joinGroupResult.memberId + val firstGenerationId = joinGroupResult.generationId + assertEquals(firstMemberId, joinGroupResult.leaderId) + assertEquals(Errors.NONE.code, joinGroupResult.errorCode) + + EasyMock.reset(offsetManager) + val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map.empty, true) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + EasyMock.reset(offsetManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + + EasyMock.reset(offsetManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true); + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE.code, joinResult.errorCode) + assertEquals(Errors.NONE.code, otherJoinResult.errorCode) + assertTrue(joinResult.generationId == otherJoinResult.generationId) + + val nextGenerationId = joinResult.generationId + val leaderId = joinResult.leaderId + val leaderAssignment = Array[Byte](0) + val followerId = otherJoinResult.memberId + val followerAssignment = Array[Byte](1) + + assertEquals(firstMemberId, joinResult.leaderId) + assertEquals(firstMemberId, otherJoinResult.leaderId) + + EasyMock.reset(offsetManager) + val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId, isCoordinatorForGroup = true) + + EasyMock.reset(offsetManager) + val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId, + Map(leaderId -> leaderAssignment, followerId -> followerAssignment), isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, leaderSyncResult._2) + assertEquals(leaderAssignment, leaderSyncResult._1) + + val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE.code, followerSyncResult._2) + assertEquals(followerAssignment, followerSyncResult._1) + } + + @Test + def testCommitOffsetFromUnknownGroup() { + val groupId = "groupId" + val consumerId = "consumer" + val generationId = 1 + val tp = new TopicAndPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val commitOffsetResult = commitOffsets(groupId, consumerId, generationId, Map(tp -> offset), true) + assertEquals(Errors.ILLEGAL_GENERATION.code, commitOffsetResult(tp)) + } + + @Test + def testCommitOffsetWithDefaultGeneration() { + val groupId = "groupId" + val tp = new TopicAndPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, + OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true) + assertEquals(Errors.NONE.code, commitOffsetResult(tp)) + } + + @Test + def testCommitOffsetInAwaitingSync() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + val tp = new TopicAndPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val assignedMemberId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset), true) + assertEquals(Errors.REBALANCE_IN_PROGRESS.code, commitOffsetResult(tp)) + } + + @Test + def testHeartbeatDuringRebalanceCausesRebalanceInProgress() { + val groupId = "groupId" + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts) + val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, + protocolType, protocols, isCoordinatorForGroup = true) + val assignedConsumerId = joinGroupResult.memberId + val initialGenerationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + // Then join with a new consumer to trigger a rebalance + EasyMock.reset(offsetManager) + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + + // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress + EasyMock.reset(offsetManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId, isCoordinatorForGroup = true) + assertEquals(Errors.REBALANCE_IN_PROGRESS.code, heartbeatResult) + } + + @Test + def testGenerationIdIncrementsOnRebalance() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val initialGenerationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(1, initialGenerationId) + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val nextGenerationId = otherJoinGroupResult.generationId + val otherJoinGroupErrorCode = otherJoinGroupResult.errorCode + assertEquals(2, nextGenerationId) + assertEquals(Errors.NONE.code, otherJoinGroupErrorCode) + } + + @Test + def testLeaveGroupWrongCoordinator() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val leaveGroupResult = leaveGroup(groupId, memberId, isCoordinatorForGroup = false) + assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, leaveGroupResult) + } + + @Test + def testLeaveGroupUnknownGroup() { + val groupId = "groupId" + val memberId = "consumerId" + + val leaveGroupResult = leaveGroup(groupId, memberId, isCoordinatorForGroup = true) + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult) + } + + @Test + def testLeaveGroupUnknownConsumerExistingGroup() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val otherMemberId = "consumerId" + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val leaveGroupResult = leaveGroup(groupId, otherMemberId, isCoordinatorForGroup = true) + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, leaveGroupResult) + } + + @Test + def testValidLeaveGroup() { + val groupId = "groupId" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val metadata = Array[Byte]() + val protocolType = "consumer" + val protocols = List(("range", metadata)) + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols, + isCoordinatorForGroup = true) + val assignedMemberId = joinGroupResult.memberId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(offsetManager) + val leaveGroupResult = leaveGroup(groupId, assignedMemberId, isCoordinatorForGroup = true) + assertEquals(Errors.NONE.code, leaveGroupResult) + } + + private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = { + val responsePromise = Promise[JoinGroupResult] + val responseFuture = responsePromise.future + val responseCallback: JoinGroupCallback = responsePromise.success(_) + (responseFuture, responseCallback) + } + + private def setupSyncGroupCallback: (Future[SyncGroupCallbackParams], SyncGroupCallback) = { + val responsePromise = Promise[SyncGroupCallbackParams] + val responseFuture = responsePromise.future + val responseCallback: SyncGroupCallback = (assignment, errorCode) => + responsePromise.success((assignment, errorCode)) + (responseFuture, responseCallback) + } + + private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = { + val responsePromise = Promise[HeartbeatCallbackParams] + val responseFuture = responsePromise.future + val responseCallback: HeartbeatCallback = errorCode => responsePromise.success(errorCode) + (responseFuture, responseCallback) + } + + private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = { + val responsePromise = Promise[CommitOffsetCallbackParams] + val responseFuture = responsePromise.future + val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets) + (responseFuture, responseCallback) + } + + private def sendJoinGroup(groupId: String, + memberId: String, + sessionTimeout: Int, + protocolType: String, + protocols: List[(String, Array[Byte])], + isCoordinatorForGroup: Boolean): Future[JoinGroupResult] = { + val (responseFuture, responseCallback) = setupJoinGroupCallback + EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) + EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) + EasyMock.replay(offsetManager) + consumerCoordinator.handleJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols, responseCallback) + responseFuture + } + + + private def sendSyncGroupLeader(groupId: String, + generation: Int, + leaderId: String, + assignment: Map[String, Array[Byte]], + isCoordinatorForGroup: Boolean): Future[SyncGroupCallbackParams] = { + val (responseFuture, responseCallback) = setupSyncGroupCallback + EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) + EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) + EasyMock.replay(offsetManager) + consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback) + responseFuture + } + + private def sendSyncGroupFollower(groupId: String, + generation: Int, + memberId: String, + isCoordinatorForGroup: Boolean): Future[SyncGroupCallbackParams] = { + val (responseFuture, responseCallback) = setupSyncGroupCallback + EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) + EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) + EasyMock.replay(offsetManager) + consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback) + responseFuture + } + + private def joinGroup(groupId: String, + memberId: String, + sessionTimeout: Int, + protocolType: String, + protocols: List[(String, Array[Byte])], + isCoordinatorForGroup: Boolean): JoinGroupResult = { + val responseFuture = sendJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols, isCoordinatorForGroup) + // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay + Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS)) + } + + + private def syncGroupFollower(groupId: String, + generationId: Int, + memberId: String, + isCoordinatorForGroup: Boolean): SyncGroupCallbackParams = { + val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId, isCoordinatorForGroup) + Await.result(responseFuture, Duration(DefaultSessionTimeout+100, TimeUnit.MILLISECONDS)) + } + + private def syncGroupLeader(groupId: String, + generationId: Int, + memberId: String, + assignment: Map[String, Array[Byte]], + isCoordinatorForGroup: Boolean): SyncGroupCallbackParams = { + val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, assignment, isCoordinatorForGroup) + Await.result(responseFuture, Duration(DefaultSessionTimeout+100, TimeUnit.MILLISECONDS)) + } + + private def heartbeat(groupId: String, + consumerId: String, + generationId: Int, + isCoordinatorForGroup: Boolean): HeartbeatCallbackParams = { + val (responseFuture, responseCallback) = setupHeartbeatCallback + EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) + EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) + EasyMock.replay(offsetManager) + consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback) + Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + } + + private def await[T](future: Future[T], millis: Long): T = { + Await.result(future, Duration(millis, TimeUnit.MILLISECONDS)) + } + + private def commitOffsets(groupId: String, + consumerId: String, + generationId: Int, + offsets: Map[TopicAndPartition, OffsetAndMetadata], + isCoordinatorForGroup: Boolean): CommitOffsetCallbackParams = { + val (responseFuture, responseCallback) = setupCommitOffsetsCallback + EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) + EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) + val storeOffsetAnswer = new IAnswer[Unit] { + override def answer = responseCallback.apply(offsets.mapValues(_ => Errors.NONE.code)) + } + EasyMock.expect(offsetManager.storeOffsets(groupId, consumerId, generationId, offsets, responseCallback)) + .andAnswer(storeOffsetAnswer) + EasyMock.replay(offsetManager) + consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback) + Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + } + + private def leaveGroup(groupId: String, consumerId: String, isCoordinatorForGroup: Boolean): LeaveGroupCallbackParams = { + val (responseFuture, responseCallback) = setupHeartbeatCallback + EasyMock.expect(offsetManager.partitionFor(groupId)).andReturn(1) + EasyMock.expect(offsetManager.leaderIsLocal(1)).andReturn(isCoordinatorForGroup) + EasyMock.replay(offsetManager) + consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback) + Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + } +}
