Repository: kafka Updated Branches: refs/heads/trunk 4fa456bc6 -> 065ddf901
KAFKA-3549: Close consumers instantiated in consumer tests Author: Grant Henke <[email protected]> Reviewers: Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1217 from granthenke/close-consumers Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/065ddf90 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/065ddf90 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/065ddf90 Branch: refs/heads/trunk Commit: 065ddf90195e09689512b55d0718a5ebdb3d42ad Parents: 4fa456b Author: Grant Henke <[email protected]> Authored: Thu Apr 14 22:02:19 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Apr 14 22:02:19 2016 -0700 ---------------------------------------------------------------------- .../clients/consumer/KafkaConsumerTest.java | 13 ++++++--- .../kafka/api/BaseConsumerTest.scala | 26 +++++++++--------- .../kafka/api/PlaintextConsumerTest.scala | 28 +++++++++++--------- .../kafka/streams/perf/SimpleBenchmark.java | 1 + .../streams/smoketest/SmokeTestDriver.java | 2 +- 5 files changed, 40 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index ff07461..2272795 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -77,6 +77,8 @@ public class KafkaConsumerTest { consumer.unsubscribe(); Assert.assertTrue(consumer.subscription().isEmpty()); Assert.assertTrue(consumer.assignment().isEmpty()); + + consumer.close(); } @Test(expected = IllegalArgumentException.class) @@ -85,10 +87,13 @@ public class KafkaConsumerTest { props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); - KafkaConsumer<byte[], byte[]> consumer = newConsumer(); - consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0))); - consumer.seek(new TopicPartition("nonExistTopic", 0), -1); + try { + consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0))); + consumer.seek(new TopicPartition("nonExistTopic", 0), -1); + } finally { + consumer.close(); + } } @Test @@ -129,6 +134,8 @@ public class KafkaConsumerTest { consumer.unsubscribe(); Assert.assertTrue(consumer.paused().isEmpty()); + + consumer.close(); } private KafkaConsumer<byte[], byte[]> newConsumer() { http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 1408cd9..916a0ab 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -13,7 +13,6 @@ package kafka.api import java.util -import kafka.coordinator.GroupCoordinator import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.record.TimestampType @@ -92,6 +91,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer0 val numRecords = 10000 sendRecords(numRecords) @@ -184,6 +184,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer0 + consumer0.subscribe(List(topic).asJava, listener) // the initial subscription should cause a callback execution @@ -209,8 +211,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { // only expect one revocation since revoke is not invoked on initial membership assertEquals(2, listener.callsToRevoked) - - consumer0.close() } @Test @@ -219,20 +219,17 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer0 - try { - val listener = new TestConsumerReassignmentListener() - consumer0.subscribe(List(topic).asJava, listener) + val listener = new TestConsumerReassignmentListener() + consumer0.subscribe(List(topic).asJava, listener) - // the initial subscription should cause a callback execution - while (listener.callsToAssigned == 0) - consumer0.poll(50) + // the initial subscription should cause a callback execution + while (listener.callsToAssigned == 0) + consumer0.poll(50) - consumer0.subscribe(List[String]().asJava) - assertEquals(0, consumer0.assignment.size()) - } finally { - consumer0.close() - } + consumer0.subscribe(List[String]().asJava) + assertEquals(0, consumer0.assignment.size()) } @Test @@ -240,6 +237,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer0 sendRecords(5) consumer0.subscribe(List(topic).asJava) http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index ff2e63d..349f7ad 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -48,6 +48,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString) val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer0 + consumer0.assign(List(tp).asJava) consumeAndVerifyRecords(consumer0, numRecords = numRecords, startingOffset = 0, @@ -405,6 +407,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { def testFetchInvalidOffset() { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer0 // produce one record val totalRecords = 2 @@ -426,8 +429,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertNotNull(outOfRangePartitions) assertEquals(1, outOfRangePartitions.size) assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp)) - - consumer0.close() } @Test @@ -435,6 +436,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { val maxFetchBytes = 10 * 1024 this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes.toString) val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer0 // produce a record that is larger than the configured fetch size val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1)) @@ -450,8 +452,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(1, oversizedPartitions.size) // the oversized message is at offset 0 assertEquals(0L, oversizedPartitions.get(tp)) - - consumer0.close() } @Test @@ -460,6 +460,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group") this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName) val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += consumer0 // create two new topics, each having 2 partitions val topic1 = "topic1" @@ -512,13 +513,13 @@ class PlaintextConsumerTest extends BaseConsumerTest { val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions) // add one more consumer and validate re-assignment - addConsumersToGroupAndWaitForGroupAssignment(1, rrConsumers, consumerPollers, List(topic1, topic2), subscriptions) + addConsumersToGroupAndWaitForGroupAssignment(1, consumers, consumerPollers, List(topic1, topic2), subscriptions) // done with pollers and consumers for (poller <- consumerPollers) poller.shutdown() - for (consumer <- rrConsumers) + for (consumer <- consumers) consumer.unsubscribe() } @@ -688,6 +689,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor") producerProps.put("mock.interceptor.append", appendStr) val testProducer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer()) + producers += testProducer // producing records should succeed testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes)) @@ -695,6 +697,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { // create consumer with interceptor that has different key and value types from the consumer this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor") val testConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer()) + consumers += testConsumer + testConsumer.assign(List(tp).asJava) testConsumer.seek(tp, 0) @@ -702,9 +706,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { val records = consumeRecords(testConsumer, 1) val record = records.get(0) assertEquals(s"value will not be modified", new String(record.value())) - - testConsumer.close() - testProducer.close() } def testConsumeMessagesWithCreateTime() { @@ -762,12 +763,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { // create one more consumer and add it to the group; we will timeout this consumer val timeoutConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig) - val expandedConsumers = consumers ++ Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](timeoutConsumer) + // Close the consumer on test teardown, unless this test will manually + if(!closeConsumer) + consumers += timeoutConsumer val timeoutPoller = subscribeConsumerAndStartPolling(timeoutConsumer, List(topic, topic1)) - val expandedPollers = consumerPollers ++ Buffer[ConsumerAssignmentPoller](timeoutPoller) + consumerPollers += timeoutPoller // validate the initial assignment - validateGroupAssignment(expandedPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") + validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}") // stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers timeoutPoller.shutdown() @@ -859,6 +862,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() for (i <- 0 until consumerCount) consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig) + consumers ++= consumerGroup // create consumer pollers, wait for assignment and validate it val consumerPollers = subscribeConsumersAndWaitForAssignment(consumerGroup, topicsToSubscribe, subscriptions) http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index c883090..a92fb1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -253,6 +253,7 @@ public class SimpleBenchmark { long endTime = System.currentTimeMillis(); + consumer.close(); System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java index d7b0139..205ba4b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java @@ -278,7 +278,7 @@ public class SmokeTestDriver extends SmokeTestUtil { } } } - + consumer.close(); System.out.println("-------------------"); System.out.println("Result Verification");
