Repository: kafka Updated Branches: refs/heads/trunk 421de0a3f -> aa73554c1
KAFKA-2737: Added single- and multi-consumer integration tests for round-robin assignment Two tests: 1. One consumer subscribes to 2 topics, each with 2 partitions; includes adding and removing a topic. 2. Several consumers subscribe to 2 topics, several partition each; includes adding one more consumer after initial assignment is done and verified. Author: Anna Povzner <[email protected]> Reviewers: Guozhang Wang Closes #413 from apovzner/cpkafka-76 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa73554c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa73554c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa73554c Branch: refs/heads/trunk Commit: aa73554c19b15a1589a77a4fae85c2d66a649acf Parents: 421de0a Author: Anna Povzner <[email protected]> Authored: Wed Nov 4 10:11:31 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 4 10:11:31 2015 -0800 ---------------------------------------------------------------------- .../kafka/api/BaseConsumerTest.scala | 54 ++++++++- .../kafka/api/PlaintextConsumerTest.scala | 117 ++++++++++++++++++- 2 files changed, 169 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/aa73554c/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 9487c77..2e674af 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -19,13 +19,14 @@ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.{PartitionInfo, TopicPartition} -import kafka.utils.{TestUtils, Logging} +import kafka.utils.{TestUtils, Logging, ShutdownableThread} import kafka.server.KafkaConfig import java.util.ArrayList import org.junit.Assert._ import org.junit.{Test, Before} +import scala.collection.mutable.Buffer import scala.collection.JavaConverters._ import kafka.coordinator.GroupCoordinator @@ -315,4 +316,55 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1 } + protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]]) extends ShutdownableThread("daemon-consumer-assignment", false) + { + @volatile private var partitionAssignment: Set[TopicPartition] = Set.empty[TopicPartition] + + def consumerAssignment(): Set[TopicPartition] = { + partitionAssignment + } + + override def doWork(): Unit = { + consumer.poll(50) + if (consumer.assignment() != partitionAssignment.asJava) { + partitionAssignment = collection.immutable.Set(consumer.assignment().asScala.toArray: _*) + } + Thread.sleep(100L) + } + } + + /** + * Check whether partition assignment is valid + * Assumes partition assignment is valid iff + * 1. Every consumer got assigned at least one partition + * 2. Each partition is assigned to only one consumer + * 3. Every partition is assigned to one of the consumers + * + * @param assignments set of consumer assignments; one per each consumer + * @param partitions set of partitions that consumers subscribed to + * @return true if partition assignment is valid + */ + def isPartitionAssignmentValid(assignments: Buffer[Set[TopicPartition]], + partitions: Set[TopicPartition]): Boolean = { + val allNonEmptyAssignments = assignments forall (assignment => assignment.size > 0) + if (!allNonEmptyAssignments) { + // at least one consumer got empty assignment + return false + } + + // make sure that sum of all partitions to all consumers equals total number of partitions + val totalPartitionsInAssignments = (0 /: assignments) (_ + _.size) + if (totalPartitionsInAssignments != partitions.size) { + // either same partitions got assigned to more than one consumer or some + // partitions were not assigned + return false + } + + // The above checks could miss the case where one or more partitions were assigned to more + // than one consumer and the same number of partitions were missing from assignments. + // Make sure that all unique assignments are the same as 'partitions' + val uniqueAssignedPartitions = (Set[TopicPartition]() /: assignments) (_ ++ _) + uniqueAssignedPartitions == partitions + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/aa73554c/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 6c7a653..2e7471c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -15,13 +15,14 @@ package kafka.api import java.util.regex.Pattern import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig} +import org.apache.kafka.clients.consumer.{NoOffsetForPartitionException, OffsetAndMetadata, KafkaConsumer, ConsumerConfig, RoundRobinAssignor} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordTooLargeException} import org.junit.Assert._ import org.junit.Test +import scala.collection.mutable.Buffer import scala.collection.JavaConverters import JavaConverters._ @@ -366,4 +367,118 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer0.close() } + + @Test + def testRoundRobinAssignment() { + // 1 consumer using round-robin assignment + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName) + val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + + // create two new topics, each having 2 partitions + val topic1 = "topic1" + val topic2 = "topic2" + val expectedAssignment = createTopicAndSendRecords(topic1, 2, 100) ++ createTopicAndSendRecords(topic2, 2, 100) + + assertEquals(0, consumer0.assignment().size) + + // subscribe to two topics + consumer0.subscribe(List(topic1, topic2).asJava) + TestUtils.waitUntilTrue(() => { + consumer0.poll(50) + consumer0.assignment() == expectedAssignment.asJava + }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer0.assignment()}") + + // add one more topic with 2 partitions + val topic3 = "topic3" + createTopicAndSendRecords(topic3, 2, 100) + + val newExpectedAssignment = expectedAssignment ++ Set(new TopicPartition(topic3, 0), new TopicPartition(topic3, 1)) + consumer0.subscribe(List(topic1, topic2, topic3).asJava) + TestUtils.waitUntilTrue(() => { + consumer0.poll(50) + consumer0.assignment() == newExpectedAssignment.asJava + }, s"Expected partitions ${newExpectedAssignment.asJava} but actually got ${consumer0.assignment()}") + + // remove the topic we just added + consumer0.subscribe(List(topic1, topic2).asJava) + TestUtils.waitUntilTrue(() => { + consumer0.poll(50) + consumer0.assignment() == expectedAssignment.asJava + }, s"Expected partitions ${expectedAssignment.asJava} but actually got ${consumer0.assignment()}") + + consumer0.unsubscribe() + assertEquals(0, consumer0.assignment().size) + } + + @Test + def testMultiConsumerRoundRobinAssignment() { + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName) + + val consumerCount = 10 + val rrConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() + for (i <- 0 until consumerCount) { + rrConsumers += new KafkaConsumer(this.consumerConfig) + } + + // create two new topics, total number of partitions must be greater than number of consumers + val topic1 = "topic1" + val topic2 = "topic2" + val subscriptions = createTopicAndSendRecords(topic1, 5, 100) ++ createTopicAndSendRecords(topic2, 8, 100) + + // all consumers subscribe to all the topics and start polling + // for the topic partition assignment + val consumerPollers = Buffer[ConsumerAssignmentPoller]() + for (consumer <- rrConsumers) { + assertEquals(0, consumer.assignment().size) + consumer.subscribe(List(topic1, topic2).asJava) + val poller = new ConsumerAssignmentPoller(consumer) + consumerPollers += poller + poller.start() + } + + TestUtils.waitUntilTrue(() => { + val assignments = Buffer[Set[TopicPartition]]() + consumerPollers.foreach(assignments += _.consumerAssignment()) + isPartitionAssignmentValid(assignments, subscriptions) + }, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") + + // add one more consumer + val newConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig) + newConsumer.subscribe(List(topic1, topic2).asJava) + val newPoller = new ConsumerAssignmentPoller(newConsumer) + rrConsumers += newConsumer + consumerPollers += newPoller + newPoller.start() + + // wait until topics get re-assigned + TestUtils.waitUntilTrue(() => { + val assignments = Buffer[Set[TopicPartition]]() + consumerPollers.foreach(assignments += _.consumerAssignment()) + isPartitionAssignmentValid(assignments, subscriptions) + }, s"Did not get valid assignment for partitions ${subscriptions.asJava} after we added one more consumer") + + for (poller <- consumerPollers) + poller.shutdown() + + for (consumer <- rrConsumers) { + consumer.unsubscribe() + } + } + + /** + * Creates topic 'topicName' with 'numPartitions' partitions and produces 'recordsPerPartition' + * records to each partition + */ + def createTopicAndSendRecords(topicName: String, numPartitions: Int, recordsPerPartition: Int): Set[TopicPartition] = { + TestUtils.createTopic(this.zkUtils, topicName, numPartitions, serverCount, this.servers) + var parts = Set[TopicPartition]() + for (partition <- 0 until numPartitions) { + val tp = new TopicPartition(topicName, partition) + sendRecords(recordsPerPartition, tp) + parts = parts + tp + } + parts + } }
