Repository: kafka Updated Branches: refs/heads/trunk 157fba840 -> 9e5d481c7
KAFKA-1416; Unify sendMessages in TestUtils; reviewed by Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9e5d481c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9e5d481c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9e5d481c Branch: refs/heads/trunk Commit: 9e5d481c7ccb5ef82196b7093f3b916194bdd90d Parents: 157fba8 Author: Flutra Osmani <[email protected]> Authored: Tue Apr 14 14:42:53 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Apr 14 14:42:53 2015 -0700 ---------------------------------------------------------------------- .../ZookeeperConsumerConnectorTest.scala | 58 ++++----- .../unit/kafka/integration/FetcherTest.scala | 19 +-- .../integration/UncleanLeaderElectionTest.scala | 45 +++---- .../ZookeeperConsumerConnectorTest.scala | 54 +++----- .../scala/unit/kafka/metrics/MetricsTest.scala | 4 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 122 +++++++++++-------- 6 files changed, 139 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index f3e76db..7f9fca3 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -79,7 +79,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // also the iterator should support re-entrant, so loop it twice for (i <- 0 until 2) { try { - getMessages(nMessages*2, topicMessageStreams0) + getMessages(topicMessageStreams0, nMessages * 2) fail("should get an exception") } catch { case e: ConsumerTimeoutException => // this is ok @@ -90,8 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkConsumerConnector0.shutdown // send some messages to each broker - val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ - sendMessagesToPartition(servers, topic, 1, nMessages) + val sentMessages1 = sendMessages(servers, topic, nMessages, 0) ++ + sendMessages(servers, topic, nMessages, 1) // wait to make sure the topic and partition have a leader for the successful case waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) @@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership @@ -124,13 +124,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ - sendMessagesToPartition(servers, topic, 1, nMessages) + val sentMessages2 = sendMessages(servers, topic, nMessages, 0) ++ + sendMessages(servers, topic, nMessages, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership @@ -145,13 +145,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages) ++ - sendMessagesToPartition(servers, topic, 1, nMessages) + val sentMessages3 = sendMessages(servers, topic, nMessages, 0) ++ + sendMessages(servers, topic, nMessages, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership @@ -179,8 +179,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages1 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages1 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -193,7 +193,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages * 2) assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership @@ -212,13 +212,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages2 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages2 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership @@ -233,13 +233,13 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages3 = sendMessagesToPartition(servers, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages3 = sendMessages(servers, topic, nMessages, 0, GZIPCompressionCodec) ++ + sendMessages(servers, topic, nMessages, 1, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) - val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + val receivedMessages3 = getMessages(topicMessageStreams1, nMessages) ++ getMessages(topicMessageStreams2, nMessages) assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership @@ -255,8 +255,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testCompressionSetConsumption() { // send some messages to each broker - val sentMessages = sendMessagesToPartition(servers, topic, 0, 200, DefaultCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, 200, DefaultCompressionCodec) + val sentMessages = sendMessages(servers, topic, 200, 0, DefaultCompressionCodec) ++ + sendMessages(servers, topic, 200, 1, DefaultCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -264,7 +264,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages = getMessages(400, topicMessageStreams1) + val receivedMessages = getMessages(topicMessageStreams1, 400) assertEquals(sentMessages.sorted, receivedMessages.sorted) // also check partition ownership @@ -281,8 +281,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages = sendMessagesToPartition(servers, topic, 0, nMessages, NoCompressionCodec) ++ - sendMessagesToPartition(servers, topic, 1, nMessages, NoCompressionCodec) + val sentMessages = sendMessages(servers, topic, nMessages, 0, NoCompressionCodec) ++ + sendMessages(servers, topic, nMessages, 1, NoCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -322,7 +322,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker - val sentMessages1 = sendMessages(servers, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(servers, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -340,7 +340,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val expected_1 = List( ("0", "group1_consumer1-0")) assertEquals(expected_1, actual_1) - val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) assertEquals(sentMessages1, receivedMessages1) zkConsumerConnector1.shutdown() zkClient.close() @@ -348,8 +348,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testConsumerRebalanceListener() { // Send messages to create topic - sendMessagesToPartition(servers, topic, 0, nMessages) - sendMessagesToPartition(servers, topic, 1, nMessages) + sendMessages(servers, topic, nMessages, 0) + sendMessages(servers, topic, nMessages, 1) val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) @@ -385,7 +385,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // Consume messages from consumer 1 to make sure it has finished rebalance - getMessages(nMessages, topicMessageStreams1) + getMessages(topicMessageStreams1, nMessages) val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) val expected_2 = List(("0", "group1_consumer1-0"), http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/integration/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 0dc837a..facebd8 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -66,32 +66,17 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def testFetcher() { val perNode = 2 - var count = sendMessages(perNode) + var count = TestUtils.sendMessages(servers, topic, perNode).size fetch(count) assertQueueEmpty() - count = sendMessages(perNode) + count = TestUtils.sendMessages(servers, topic, perNode).size fetch(count) assertQueueEmpty() } def assertQueueEmpty(): Unit = assertEquals(0, queue.size) - def sendMessages(messagesPerNode: Int): Int = { - var count = 0 - for(conf <- configs) { - val producer: Producer[String, Array[Byte]] = TestUtils.createProducer( - TestUtils.getBrokerListStrFromServers(servers), - keyEncoder = classOf[StringEncoder].getName) - val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray - messages += conf.brokerId -> ms - producer.send(ms.map(m => new KeyedMessage[String, Array[Byte]](topic, topic, m)):_*) - producer.close() - count += ms.size - } - count - } - def fetch(expected: Int) { var count = 0 while(true) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index a130089..5b7b529 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -29,7 +29,7 @@ import kafka.admin.AdminUtils import kafka.common.FailedToSendMessageException import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException} import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.StringEncoder +import kafka.serializer.StringDecoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.CoreUtils import kafka.utils.TestUtils._ @@ -175,14 +175,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(topic, "first") + sendMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(topic, "second") + sendMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -192,7 +192,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId)) - produceMessage(topic, "third") + sendMessage(servers, topic, "third") // second message was lost due to unclean election assertEquals(List("first", "third"), consumeAllMessages(topic)) @@ -210,14 +210,14 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - produceMessage(topic, "first") + sendMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - produceMessage(topic, "second") + sendMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -229,7 +229,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // message production and consumption should both fail while leader is down intercept[FailedToSendMessageException] { - produceMessage(topic, "third") + sendMessage(servers, topic, "third") } assertEquals(List.empty[String], consumeAllMessages(topic)) @@ -237,7 +237,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId)) - produceMessage(topic, "third") + sendMessage(servers, topic, "third") waitUntilMetadataIsPropagated(servers, topic, partitionId) servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) @@ -253,33 +253,16 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { server.awaitShutdown() } - private def produceMessage(topic: String, message: String) = { - val producer: Producer[String, Array[Byte]] = createProducer( - getBrokerListStrFromServers(servers), - keyEncoder = classOf[StringEncoder].getName) - producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, message.getBytes)) - producer.close() - } - private def consumeAllMessages(topic: String) : List[String] = { // use a fresh consumer group every time so that we don't need to mess with disabling auto-commit or // resetting the ZK offset val consumerProps = createConsumerProperties(zkConnect, "group" + random.nextLong, "id", 1000) val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) - val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head - - val messages = new MutableList[String] - val iter = messageStream.iterator - try { - while(iter.hasNext()) { - messages += new String(iter.next.message) // will throw a timeout exception if the message isn't there - } - } catch { - case e: ConsumerTimeoutException => - debug("consumer timed out after receiving " + messages.length + " message(s).") - } finally { - consumerConnector.shutdown - } - messages.toList + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + + val messages = getMessages(messageStream) + consumerConnector.shutdown + + messages } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index ad66bb2..74c761d 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -60,7 +60,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar TestUtils.createTopic(zkClient, topic, numParts, 1, servers) // send some messages to each broker - val sentMessages1 = sendMessages(nMessages, "batch1") + val sentMessages1 = sendMessages(servers, nMessages, "batch1") // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -82,32 +82,24 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.ERROR) } - def sendMessages(conf: KafkaConfig, + def sendMessages(servers: Seq[KafkaServer], messagesPerNode: Int, - header: String, - compressed: CompressionCodec): List[String] = { + header: String): List[String] = { var messages: List[String] = Nil - val producer: kafka.producer.Producer[Int, String] = - TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName) - val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) - for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) - messages ++= ms - import JavaConversions._ - javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) + for(server <- servers) { + val producer: kafka.producer.Producer[Int, String] = + TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName) + val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) + for (partition <- 0 until numParts) { + val ms = 0.until(messagesPerNode).map(x => header + server.config.brokerId + "-" + partition + "-" + x) + messages ++= ms + import JavaConversions._ + javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) + } + javaProducer.close } - javaProducer.close - messages - } - - def sendMessages(messagesPerNode: Int, - header: String, - compressed: CompressionCodec = NoCompressionCodec): List[String] = { - var messages: List[String] = Nil - for(conf <- configs) - messages ++= sendMessages(conf, messagesPerNode, header, compressed) messages } @@ -115,18 +107,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { var messages: List[String] = Nil import scala.collection.JavaConversions._ - val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams - for ((topic, messageStreams) <- topicMessageStreams) { - for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) - } - } - } + val topicMessageStreams = jTopicMessageStreams.mapValues(_.toList) + messages = TestUtils.getMessages(topicMessageStreams, nMessagesPerThread) messages } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 247a6e9..b42101b 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -77,12 +77,12 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { } def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { - val sentMessages1 = sendMessages(servers, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(servers, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) zkConsumerConnector1.shutdown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9e5d481c/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5a9e84d..8dc99b6 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -36,7 +36,7 @@ import kafka.producer._ import kafka.message._ import kafka.api._ import kafka.cluster.Broker -import kafka.consumer.{KafkaStream, ConsumerConfig} +import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils @@ -745,71 +745,97 @@ object TestUtils extends Logging { time = time, brokerState = new BrokerState()) } - - def sendMessagesToPartition(servers: Seq[KafkaServer], - topic: String, - partition: Int, - numMessages: Int, - compression: CompressionCodec = NoCompressionCodec): List[String] = { + def sendMessages(servers: Seq[KafkaServer], + topic: String, + numMessages: Int, + partition: Int = -1, + compression: CompressionCodec = NoCompressionCodec): List[String] = { val header = "test-%d".format(partition) val props = new Properties() props.put("compression.codec", compression.codec.toString) - val producer: Producer[Int, String] = - createProducer(TestUtils.getBrokerListStrFromServers(servers), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, - partitioner = classOf[FixedValuePartitioner].getName, - producerProps = props) - val ms = 0.until(numMessages).map(x => header + "-" + x) - producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) - producer.close() - ms.toList - } - def sendMessages(servers: Seq[KafkaServer], - topic: String, - producerId: String, - messagesPerNode: Int, - header: String, - compression: CompressionCodec, - numParts: Int): List[String]= { - var messages: List[String] = Nil - val props = new Properties() - props.put("compression.codec", compression.codec.toString) - props.put("client.id", producerId) - val producer: Producer[Int, String] = - createProducer(brokerList = TestUtils.getBrokerListStrFromServers(servers), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, - partitioner = classOf[FixedValuePartitioner].getName, - producerProps = props) + // Specific Partition + if (partition >= 0) { + val producer: Producer[Int, String] = + createProducer(TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName, + partitioner = classOf[FixedValuePartitioner].getName, + producerProps = props) - for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x) producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - messages ++= ms debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) + producer.close() + ms.toList + } else { + // Use topic as the key to determine partition + val producer: Producer[String, String] = createProducer( + TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[StringEncoder].getName, + partitioner = classOf[DefaultPartitioner].getName, + producerProps = props) + producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)):_*) + producer.close() + debug("Sent %d messages for topic [%s]".format(ms.size, topic)) + ms.toList } + + } + + def sendMessage(servers: Seq[KafkaServer], + topic: String, + message: String) = { + + val producer: Producer[String, String] = + createProducer(TestUtils.getBrokerListStrFromServers(servers), + encoder = classOf[StringEncoder].getName(), + keyEncoder = classOf[StringEncoder].getName()) + + producer.send(new KeyedMessage[String, String](topic, topic, message)) producer.close() - messages } - def getMessages(nMessagesPerThread: Int, - topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = { + /** + * Consume all messages (or a specific number of messages) + * @param topicMessageStreams the Topic Message Streams + * @param nMessagesPerThread an optional field to specify the exact number of messages to be returned. + * ConsumerTimeoutException will be thrown if there are no messages to be consumed. + * If not specified, then all available messages will be consumed, and no exception is thrown. + * + * + * @return the list of messages consumed. + */ + def getMessages(topicMessageStreams: Map[String, List[KafkaStream[String, String]]], + nMessagesPerThread: Int = -1): List[String] = { + var messages: List[String] = Nil + val shouldGetAllMessages = nMessagesPerThread < 0 for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) + val iterator = messageStream.iterator() + try { + var i = 0 + while ((shouldGetAllMessages && iterator.hasNext()) || (i < nMessagesPerThread)) { + assertTrue(iterator.hasNext) + val message = iterator.next.message // will throw a timeout exception if the message isn't there + messages ::= message + debug("received message: " + message) + i += 1 + } + } catch { + case e: ConsumerTimeoutException => + if (shouldGetAllMessages) { + // swallow the exception + debug("consumer timed out after receiving " + messages.length + " message(s).") + } else { + throw e + } } } } + messages.reverse }
