Repository: kafka Updated Branches: refs/heads/trunk 6e747d429 -> 86eb74d92
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala new file mode 100644 index 0000000..0f3e748 --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import org.junit.Assert._ +import org.junit.{Before, Test} +import org.scalatest.junit.JUnitSuite + +/** + * Test group state transitions and other GroupMetadata functionality + */ +class GroupMetadataTest extends JUnitSuite { + var group: GroupMetadata = null + + @Before + def setUp() { + group = new GroupMetadata("groupId", "consumer") + } + + @Test + def testCanRebalanceWhenStable() { + assertTrue(group.canRebalance) + } + + @Test + def testCanRebalanceWhenAwaitingSync(){ + group.transitionTo(PreparingRebalance) + group.transitionTo(AwaitingSync) + assertTrue(group.canRebalance) + } + + @Test + def testCannotRebalanceWhenPreparingRebalance() { + group.transitionTo(PreparingRebalance) + assertFalse(group.canRebalance) + } + + @Test + def testCannotRebalanceWhenDead() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + assertFalse(group.canRebalance) + } + + @Test + def testStableToPreparingRebalanceTransition() { + group.transitionTo(PreparingRebalance) + assertState(group, PreparingRebalance) + } + + @Test + def testAwaitingSyncToPreparingRebalanceTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(AwaitingSync) + group.transitionTo(PreparingRebalance) + assertState(group, PreparingRebalance) + } + + @Test + def testPreparingRebalanceToDeadTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + assertState(group, Dead) + } + + @Test + def testAwaitingSyncToStableTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(AwaitingSync) + group.transitionTo(Stable) + assertState(group, Stable) + } + + @Test(expected = classOf[IllegalStateException]) + def testStableToStableIllegalTransition() { + group.transitionTo(Stable) + } + + @Test(expected = classOf[IllegalStateException]) + def testStableToAwaitingSyncIllegalTransition() { + group.transitionTo(AwaitingSync) + } + + @Test(expected = classOf[IllegalStateException]) + def testStableToDeadIllegalTransition() { + group.transitionTo(Dead) + } + + @Test(expected = classOf[IllegalStateException]) + def testPreparingRebalanceToPreparingRebalanceIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(PreparingRebalance) + } + + @Test(expected = classOf[IllegalStateException]) + def testPreparingRebalanceToStableIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Stable) + } + + @Test(expected = classOf[IllegalStateException]) + def testAwaitingSyncToAwaitingSyncIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(AwaitingSync) + group.transitionTo(AwaitingSync) + } + + @Test(expected = classOf[IllegalStateException]) + def testAwaitingSyncToDeadIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(AwaitingSync) + group.transitionTo(Dead) + } + + @Test(expected = classOf[IllegalStateException]) + def testDeadToDeadIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + group.transitionTo(Dead) + } + + @Test(expected = classOf[IllegalStateException]) + def testDeadToStableIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + group.transitionTo(Stable) + } + + @Test(expected = classOf[IllegalStateException]) + def testDeadToPreparingRebalanceIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + group.transitionTo(PreparingRebalance) + } + + @Test(expected = classOf[IllegalStateException]) + def testDeadToAwaitingSyncIllegalTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Dead) + group.transitionTo(AwaitingSync) + } + + @Test + def testSelectProtocol() { + val groupId = "groupId" + + val sessionTimeoutMs = 10000 + + val memberId = "memberId" + val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, + List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) + + group.add(memberId, member) + assertEquals("range", group.selectProtocol) + + val otherMemberId = "otherMemberId" + val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs, + List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) + + group.add(otherMemberId, otherMember) + // now could be either range or robin since there is no majority preference + assertTrue(Set("range", "roundrobin")(group.selectProtocol)) + + val lastMemberId = "lastMemberId" + val lastMember = new MemberMetadata(lastMemberId, groupId, sessionTimeoutMs, + List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) + + group.add(lastMemberId, lastMember) + // now we should prefer 'roundrobin' + assertEquals("roundrobin", group.selectProtocol) + } + + @Test(expected = classOf[IllegalStateException]) + def testSelectProtocolRaisesIfNoMembers() { + group.selectProtocol + fail() + } + + @Test + def testSelectProtocolChoosesCompatibleProtocol() { + val groupId = "groupId" + + val sessionTimeoutMs = 10000 + + val memberId = "memberId" + val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, + List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) + + val otherMemberId = "otherMemberId" + val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs, + List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) + + group.add(memberId, member) + group.add(otherMemberId, otherMember) + assertEquals("roundrobin", group.selectProtocol) + } + + @Test + def testSupportsProtocols() { + val groupId = "groupId" + + val sessionTimeoutMs = 10000 + + // by default, the group supports everything + assertTrue(group.supportsProtocols(Set("roundrobin", "range"))) + + val memberId = "memberId" + val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, + List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) + + group.add(memberId, member) + assertTrue(group.supportsProtocols(Set("roundrobin", "foo"))) + assertTrue(group.supportsProtocols(Set("range", "foo"))) + assertFalse(group.supportsProtocols(Set("foo", "bar"))) + + val otherMemberId = "otherMemberId" + val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs, + List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) + + group.add(otherMemberId, otherMember) + + assertTrue(group.supportsProtocols(Set("roundrobin", "foo"))) + assertFalse(group.supportsProtocols(Set("range", "foo"))) + } + + private def assertState(group: GroupMetadata, targetState: GroupState) { + val states: Set[GroupState] = Set(Stable, PreparingRebalance, AwaitingSync, Dead) + val otherStates = states - targetState + otherStates.foreach { otherState => + assertFalse(group.is(otherState)) + } + assertTrue(group.is(targetState)) + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala new file mode 100644 index 0000000..0a5bb3c --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator + +import java.util + +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnitSuite + +class MemberMetadataTest extends JUnitSuite { + + @Test + def testMatchesSupportedProtocols { + val groupId = "groupId" + val memberId = "memberId" + val sessionTimeoutMs = 10000 + val protocols = List(("range", Array.empty[Byte])) + + val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + assertTrue(member.matches(protocols)) + assertFalse(member.matches(List(("range", Array[Byte](0))))) + assertFalse(member.matches(List(("roundrobin", Array.empty[Byte])))) + assertFalse(member.matches(List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))) + } + + @Test + def testVoteForPreferredProtocol { + val groupId = "groupId" + val memberId = "memberId" + val sessionTimeoutMs = 10000 + val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) + + val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + assertEquals("range", member.vote(Set("range", "roundrobin"))) + assertEquals("roundrobin", member.vote(Set("blah", "roundrobin"))) + } + + @Test + def testMetadata { + val groupId = "groupId" + val memberId = "memberId" + val sessionTimeoutMs = 10000 + val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1))) + + val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + assertTrue(util.Arrays.equals(Array[Byte](0), member.metadata("range"))) + assertTrue(util.Arrays.equals(Array[Byte](1), member.metadata("roundrobin"))) + } + + @Test(expected = classOf[IllegalArgumentException]) + def testMetadataRaisesOnUnsupportedProtocol { + val groupId = "groupId" + val memberId = "memberId" + val sessionTimeoutMs = 10000 + val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) + + val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + member.metadata("blah") + fail() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testVoteRaisesOnNoSupportedProtocols { + val groupId = "groupId" + val memberId = "memberId" + val sessionTimeoutMs = 10000 + val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) + + val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + member.vote(Set("blah")) + fail() + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala deleted file mode 100644 index 79c691f..0000000 --- a/core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala +++ /dev/null @@ -1,305 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.common.TopicAndPartition - -import org.junit.Assert._ -import org.junit.Test -import org.scalatest.junit.JUnitSuite - -class PartitionAssignorTest extends JUnitSuite { - - @Test - def testRangeAssignorOneConsumerNoTopic() { - val consumer = "consumer" - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer -> Set.empty[String]) - val partitionsPerTopic = Map.empty[String, Int] - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> Set.empty[TopicAndPartition]) - assertEquals(expected, actual) - } - - @Test - def testRangeAssignorOneConsumerNonexistentTopic() { - val topic = "topic" - val consumer = "consumer" - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer -> Set(topic)) - val partitionsPerTopic = Map(topic -> 0) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> Set.empty[TopicAndPartition]) - assertEquals(expected, actual) - } - - @Test - def testRangeAssignorOneConsumerOneTopic() { - val topic = "topic" - val consumer = "consumer" - val numPartitions = 3 - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer -> Set(topic)) - val partitionsPerTopic = Map(topic -> numPartitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> topicAndPartitions(Map(topic -> Set(0, 1, 2)))) - assertEquals(expected, actual) - } - - @Test - def testRangeAssignorOnlyAssignsPartitionsFromSubscribedTopics() { - val subscribedTopic = "topic" - val otherTopic = "other" - val consumer = "consumer" - val subscribedTopicNumPartitions = 3 - val otherTopicNumPartitions = 3 - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer -> Set(subscribedTopic)) - val partitionsPerTopic = Map(subscribedTopic -> subscribedTopicNumPartitions, otherTopic -> otherTopicNumPartitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> topicAndPartitions(Map(subscribedTopic -> Set(0, 1, 2)))) - assertEquals(expected, actual) - } - - @Test - def testRangeAssignorOneConsumerMultipleTopics() { - val topic1 = "topic1" - val topic2 = "topic2" - val consumer = "consumer" - val numTopic1Partitions = 1 - val numTopic2Partitions = 2 - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer -> Set(topic1, topic2)) - val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> topicAndPartitions(Map(topic1 -> Set(0), topic2 -> Set(0, 1)))) - assertEquals(expected, actual) - } - - @Test - def testRangeAssignorTwoConsumersOneTopicOnePartition() { - val topic = "topic" - val consumer1 = "consumer1" - val consumer2 = "consumer2" - val numPartitions = 1 - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer1 -> Set(topic), consumer2 -> Set(topic)) - val partitionsPerTopic = Map(topic -> numPartitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map( - consumer1 -> topicAndPartitions(Map(topic -> Set(0))), - consumer2 -> Set.empty[TopicAndPartition]) - assertEquals(expected, actual) - } - - @Test - def testRangeAssignorTwoConsumersOneTopicTwoPartitions() { - val topic = "topic" - val consumer1 = "consumer1" - val consumer2 = "consumer2" - val numPartitions = 2 - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer1 -> Set(topic), consumer2 -> Set(topic)) - val partitionsPerTopic = Map(topic -> numPartitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map( - consumer1 -> topicAndPartitions(Map(topic -> Set(0))), - consumer2 -> topicAndPartitions(Map(topic -> Set(1)))) - assertEquals(expected, actual) - } - - @Test - def testRangeAssignorMultipleConsumersMixedTopics() { - val topic1 = "topic1" - val topic2 = "topic2" - val consumer1 = "consumer1" - val consumer2 = "consumer2" - val consumer3 = "consumer3" - val numTopic1Partitions = 3 - val numTopic2Partitions = 2 - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer1 -> Set(topic1), consumer2 -> Set(topic1, topic2), consumer3 -> Set(topic1)) - val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map( - consumer1 -> topicAndPartitions(Map(topic1 -> Set(0))), - consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic2 -> Set(0, 1))), - consumer3 -> topicAndPartitions(Map(topic1 -> Set(2)))) - assertEquals(expected, actual) - } - - @Test - def testRangeAssignorTwoConsumersTwoTopicsSixPartitions() { - val topic1 = "topic1" - val topic2 = "topic2" - val consumer1 = "consumer1" - val consumer2 = "consumer2" - val numTopic1Partitions = 3 - val numTopic2Partitions = 3 - val assignor = new RangeAssignor() - val topicsPerConsumer = Map(consumer1 -> Set(topic1, topic2), consumer2 -> Set(topic1, topic2)) - val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map( - consumer1 -> topicAndPartitions(Map(topic1 -> Set(0, 1), topic2 -> Set(0, 1))), - consumer2 -> topicAndPartitions(Map(topic1 -> Set(2), topic2 -> Set(2)))) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorOneConsumerNoTopic() { - val consumer = "consumer" - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer -> Set.empty[String]) - val partitionsPerTopic = Map.empty[String, Int] - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> Set.empty[TopicAndPartition]) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorOneConsumerNonexistentTopic() { - val topic = "topic" - val consumer = "consumer" - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer -> Set(topic)) - val partitionsPerTopic = Map(topic -> 0) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> Set.empty[TopicAndPartition]) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorOneConsumerOneTopic() { - val topic = "topic" - val consumer = "consumer" - val numPartitions = 3 - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer -> Set(topic)) - val partitionsPerTopic = Map(topic -> numPartitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> topicAndPartitions(Map(topic -> Set(0, 1, 2)))) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorOnlyAssignsPartitionsFromSubscribedTopics() { - val subscribedTopic = "topic" - val otherTopic = "other" - val consumer = "consumer" - val subscribedTopicNumPartitions = 3 - val otherTopicNumPartitions = 3 - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer -> Set(subscribedTopic)) - val partitionsPerTopic = Map(subscribedTopic -> subscribedTopicNumPartitions, otherTopic -> otherTopicNumPartitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> topicAndPartitions(Map(subscribedTopic -> Set(0, 1, 2)))) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorOneConsumerMultipleTopics() { - val topic1 = "topic1" - val topic2 = "topic2" - val consumer = "consumer" - val numTopic1Partitions = 1 - val numTopic2Partitions = 2 - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer -> Set(topic1, topic2)) - val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map(consumer -> topicAndPartitions(Map(topic1 -> Set(0), topic2 -> Set(0, 1)))) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorTwoConsumersOneTopicOnePartition() { - val topic = "topic" - val consumer1 = "consumer1" - val consumer2 = "consumer2" - val numPartitions = 1 - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer1 -> Set(topic), consumer2 -> Set(topic)) - val partitionsPerTopic = Map(topic -> numPartitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map( - consumer1 -> topicAndPartitions(Map(topic -> Set(0))), - consumer2 -> Set.empty[TopicAndPartition]) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorTwoConsumersOneTopicTwoPartitions() { - val topic = "topic" - val consumer1 = "consumer1" - val consumer2 = "consumer2" - val numPartitions = 2 - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer1 -> Set(topic), consumer2 -> Set(topic)) - val partitionsPerTopic = Map(topic -> numPartitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map( - consumer1 -> topicAndPartitions(Map(topic -> Set(0))), - consumer2 -> topicAndPartitions(Map(topic -> Set(1)))) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorMultipleConsumersMixedTopics() { - val topic1 = "topic1" - val topic2 = "topic2" - val consumer1 = "consumer1" - val consumer2 = "consumer2" - val consumer3 = "consumer3" - val numTopic1Partitions = 3 - val numTopic2Partitions = 2 - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer1 -> Set(topic1), consumer2 -> Set(topic1, topic2), consumer3 -> Set(topic1)) - val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map( - consumer1 -> topicAndPartitions(Map(topic1 -> Set(0))), - consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic2 -> Set(0, 1))), - consumer3 -> topicAndPartitions(Map(topic1 -> Set(2)))) - assertEquals(expected, actual) - } - - @Test - def testRoundRobinAssignorTwoConsumersTwoTopicsSixPartitions() { - val topic1 = "topic1" - val topic2 = "topic2" - val consumer1 = "consumer1" - val consumer2 = "consumer2" - val numTopic1Partitions = 3 - val numTopic2Partitions = 3 - val assignor = new RoundRobinAssignor() - val topicsPerConsumer = Map(consumer1 -> Set(topic1, topic2), consumer2 -> Set(topic1, topic2)) - val partitionsPerTopic = Map(topic1 -> numTopic1Partitions, topic2 -> numTopic2Partitions) - val actual = assignor.assign(topicsPerConsumer, partitionsPerTopic) - val expected = Map( - consumer1 -> topicAndPartitions(Map(topic1 -> Set(0, 2), topic2 -> Set(1))), - consumer2 -> topicAndPartitions(Map(topic1 -> Set(1), topic2 -> Set(0, 2)))) - assertEquals(expected, actual) - } - - private def topicAndPartitions(topicPartitions: Map[String, Set[Int]]): Set[TopicAndPartition] = { - topicPartitions.flatMap { case (topic, partitions) => - partitions.map(partition => TopicAndPartition(topic, partition)) - }.toSet - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 85252d0..6238f6d 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -206,8 +206,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { val resourceToAcls = Map[Resource, Set[Acl]]( new Resource(Topic, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)), new Resource(Cluster, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow, host1, Read)), - new Resource(ConsumerGroup, Resource.WildCardResource) -> acls, - new Resource(ConsumerGroup, "test-ConsumerGroup") -> acls + new Resource(Group, Resource.WildCardResource) -> acls, + new Resource(Group, "test-ConsumerGroup") -> acls ) resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index c9f2540..1c3e55d 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -465,8 +465,8 @@ class KafkaConfigTest { case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") - case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") - case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.GroupMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.GroupMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 7440500..4e5e776 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest} +import kafka.api.{GroupMetadataRequest, OffsetCommitRequest, OffsetFetchRequest} import kafka.consumer.SimpleConsumer import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition} import kafka.utils._ @@ -56,7 +56,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness { time = new MockTime() server = TestUtils.createServer(KafkaConfig.fromProps(config), time) simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client") - val consumerMetadataRequest = ConsumerMetadataRequest(group) + val consumerMetadataRequest = GroupMetadataRequest(group) Stream.continually { val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest) consumerMetadataResponse.coordinatorOpt.isDefined http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 46c88a3..ca17c6b 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -49,12 +49,10 @@ import kafka.utils.ZkUtils._ import org.junit.Assert._ import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} -import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.network.Mode import org.apache.kafka.common.security.ssl.SSLFactory -import org.apache.kafka.common.config.SSLConfigs import org.apache.kafka.test.TestSSLUtils import scala.collection.Map @@ -481,7 +479,7 @@ object TestUtils extends Logging { groupId: String, autoOffsetReset: String = "earliest", partitionFetchSize: Long = 4096L, - partitionAssignmentStrategy: String = "blah", + partitionAssignmentStrategy: String = classOf[RangeAssignor].getName, sessionTimeout: Int = 30000, securityProtocol: SecurityProtocol, trustStoreFile: Option[File] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = {
