[
https://issues.apache.org/jira/browse/KAFKA-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kostya Golikov updated KAFKA-1079:
----------------------------------
Fix Version/s: 0.8.1
Status: Patch Available (was: Open)
>From 7845a5af42ee44f9786542178c0789d93c66429f Mon Sep 17 00:00:00 2001
From: Kostya Golikov <[email protected]>
Date: Thu, 24 Oct 2013 00:48:56 +0400
Subject: [PATCH] Fixed liars in primary api test: now checking is done not
only against no-compression producer, but against with-compression producer
as well.
---
.../unit/kafka/integration/PrimitiveApiTest.scala | 141 ++++++---------------
1 file changed, 40 insertions(+), 101 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 5f331d2..c001b4e 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -35,12 +35,12 @@ import kafka.utils.{TestUtils, Utils}
* End to end tests of the primitive apis against a local server
*/
class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with ZooKeeperTestHarness {
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
- val port = TestUtils.choosePort
+ val port = TestUtils.choosePort()
val props = TestUtils.createBrokerConfig(0, port)
val config = new KafkaConfig(props)
val configs = List(config)
- val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
def testFetchRequestCanProperlySerialize() {
val request = new FetchRequestBuilder()
@@ -100,7 +100,7 @@ class PrimitiveApiTest extends JUnit3Suite with
ProducerConsumerTestHarness with
val stringProducer1 = new Producer[String, String](config)
stringProducer1.send(new KeyedMessage[String, String](topic,
"test-message"))
- var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0,
0, 10000).build())
+ val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0,
0, 10000).build())
val messageSet = fetched.messageSet(topic, 0)
assertTrue(messageSet.iterator.hasNext)
@@ -108,8 +108,8 @@ class PrimitiveApiTest extends JUnit3Suite with
ProducerConsumerTestHarness with
assertEquals("test-message",
Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
}
- def testProduceAndMultiFetch() {
- createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3",
"test4"), config.brokerId)
+ private def produceAndMultiFetch(producer: Producer[String, String]) {
+ createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3",
"test4"))
// send some messages
val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -171,117 +171,56 @@ class PrimitiveApiTest extends JUnit3Suite with
ProducerConsumerTestHarness with
requestHandlerLogger.setLevel(Level.ERROR)
}
- def testProduceAndMultiFetchWithCompression() {
- createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3",
"test4"), config.brokerId)
-
- // send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
- {
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- for( (topic, partition) <- topics) {
- val messageList = List("a_" + topic, "b_" + topic)
- val producerData = messageList.map(new KeyedMessage[String,
String](topic, topic, _))
- messages += topic -> messageList
- producer.send(producerData:_*)
- builder.addFetch(topic, partition, 0, 10000)
- }
-
- // wait a bit for produced message to be available
- val request = builder.build()
- val response = consumer.fetch(request)
- for( (topic, partition) <- topics) {
- val fetched = response.messageSet(topic, partition)
- assertEquals(messages(topic), fetched.map(messageAndOffset =>
Utils.readString(messageAndOffset.message.payload)))
- }
- }
-
- // temporarily set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.FATAL)
-
- {
- // send some invalid offsets
- val builder = new FetchRequestBuilder()
- for( (topic, partition) <- topics)
- builder.addFetch(topic, partition, -1, 10000)
-
- try {
- val request = builder.build()
- val response = consumer.fetch(request)
- response.data.values.foreach(pdata =>
ErrorMapping.maybeThrowException(pdata.error))
- fail("Expected exception when fetching message with invalid offset")
- } catch {
- case e: OffsetOutOfRangeException => "this is good"
- }
- }
-
- {
- // send some invalid partitions
- val builder = new FetchRequestBuilder()
- for( (topic, _) <- topics)
- builder.addFetch(topic, -1, 0, 10000)
-
- try {
- val request = builder.build()
- val response = consumer.fetch(request)
- response.data.values.foreach(pdata =>
ErrorMapping.maybeThrowException(pdata.error))
- fail("Expected exception when fetching message with invalid partition")
- } catch {
- case e: UnknownTopicOrPartitionException => "this is good"
- }
- }
+ def testProduceAndMultiFetch() {
+ val props = producer.config.props.props
+ val config = new ProducerConfig(props)
+ val noCompressionProducer = new Producer[String, String](config)
+ produceAndMultiFetch(noCompressionProducer)
+ }
- // restore set request handler logger to a higher level
- requestHandlerLogger.setLevel(Level.ERROR)
+ def testProduceAndMultiFetchWithCompression() {
+ val props = producer.config.props.props
+ props.put("compression", "true")
+ val config = new ProducerConfig(props)
+ val producerWithCompression = new Producer[String, String](config)
+ produceAndMultiFetch(producerWithCompression)
}
- def testMultiProduce() {
- createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3",
"test4"), config.brokerId)
+ private def multiProduce(producer: Producer[String, String]) {
+ val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
+ createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
- // send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
- var produceList: List[KeyedMessage[String, String]] = Nil
- for( (topic, partition) <- topics) {
+ for((topic, partition) <- topics) {
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String,
String](topic, topic, _))
messages += topic -> messageList
producer.send(producerData:_*)
builder.addFetch(topic, partition, 0, 10000)
}
- producer.send(produceList: _*)
val request = builder.build()
val response = consumer.fetch(request)
- for( (topic, partition) <- topics) {
+ for((topic, partition) <- topics) {
val fetched = response.messageSet(topic, partition)
assertEquals(messages(topic), fetched.map(messageAndOffset =>
Utils.readString(messageAndOffset.message.payload)))
}
}
- def testMultiProduceWithCompression() {
- // send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
- val messages = new mutable.HashMap[String, Seq[String]]
- val builder = new FetchRequestBuilder()
- var produceList: List[KeyedMessage[String, String]] = Nil
- for( (topic, partition) <- topics) {
- val messageList = List("a_" + topic, "b_" + topic)
- val producerData = messageList.map(new KeyedMessage[String,
String](topic, topic, _))
- messages += topic -> messageList
- producer.send(producerData:_*)
- builder.addFetch(topic, partition, 0, 10000)
- }
- producer.send(produceList: _*)
+ def testMultiProduce() {
+ val props = producer.config.props.props
+ val config = new ProducerConfig(props)
+ val noCompressionProducer = new Producer[String, String](config)
+ multiProduce(noCompressionProducer)
+ }
- // wait a bit for produced message to be available
- val request = builder.build()
- val response = consumer.fetch(request)
- for( (topic, partition) <- topics) {
- val fetched = response.messageSet(topic, 0)
- assertEquals(messages(topic), fetched.map(messageAndOffset =>
Utils.readString(messageAndOffset.message.payload)))
- }
+ def testMultiProduceWithCompression() {
+ val props = producer.config.props.props
+ props.put("compression", "true")
+ val config = new ProducerConfig(props)
+ val producerWithCompression = new Producer[String, String](config)
+ multiProduce(producerWithCompression)
}
def testConsumerEmptyTopic() {
@@ -294,16 +233,15 @@ class PrimitiveApiTest extends JUnit3Suite with
ProducerConsumerTestHarness with
}
def testPipelinedProduceRequests() {
- createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3",
"test4"), config.brokerId)
+ val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
+ createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
val props = producer.config.props.props
props.put("request.required.acks", "0")
val pipelinedProducer: Producer[String, String] = new Producer(new
ProducerConfig(props))
// send some messages
- val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
val messages = new mutable.HashMap[String, Seq[String]]
val builder = new FetchRequestBuilder()
- var produceList: List[KeyedMessage[String, String]] = Nil
for( (topic, partition) <- topics) {
val messageList = List("a_" + topic, "b_" + topic)
val producerData = messageList.map(new KeyedMessage[String,
String](topic, topic, _))
@@ -338,10 +276,11 @@ class PrimitiveApiTest extends JUnit3Suite with
ProducerConsumerTestHarness with
* For testing purposes, just create these topics each with one partition
and one replica for
* which the provided broker should the leader for. Create and wait for
broker to lead. Simple.
*/
- def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics:
Seq[String], brokerId: Int) {
+ private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics:
Iterable[String]) {
for( topic <- topics ) {
- AdminUtils.createTopic(zkClient, topic, 1, 1)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ AdminUtils.deleteTopic(zkClient, topic)
+ AdminUtils.createTopic(zkClient, topic, partitions = 1,
replicationFactor = 1)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition =
0, timeoutMs = 500)
}
}
}
--
1.8.5.1
> Liars in PrimitiveApiTest that promise to test api in compression mode, but
> don't do this actually
> --------------------------------------------------------------------------------------------------
>
> Key: KAFKA-1079
> URL: https://issues.apache.org/jira/browse/KAFKA-1079
> Project: Kafka
> Issue Type: Test
> Components: core
> Affects Versions: 0.8.0
> Reporter: Kostya Golikov
> Priority: Minor
> Labels: newbie, test
> Fix For: 0.8.1
>
> Attachments: testing-with-compression-producer.patch
>
>
> Long time ago (0.7) we had ByteBufferMessageSet as a part of api and it's
> allowed us to control compression. Times goes on and now PrimitiveApiTest
> have methods that promise to test api with compression enabled, but in fact
> they don't. Moreover this methods almost entirely copy their counterparts
> without compression. In particular I'm talking about
> `testProduceAndMultiFetch` / `testProduceAndMultiFetchWithCompression` and
> `testMultiProduce`/`testMultiProduceWithCompression` pairs.
> The fix could be super-easy and soundness -- just parameterize methods with
> producer of each type (with/without compression). Sadly but it isn't feasible
> for junit3, so straightforward solution is to do the same ugly thing as
> `testDefaultEncoderProducerAndFetchWithCompression` method does -- forget
> about class-wide producer and roll-out it's own. I will attach path if that
> is a problem indeed.
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)