Repository: kafka
Updated Branches:
  refs/heads/trunk ef5867dce -> 48c2160c7


MINOR: Wait for tasks to terminate to avoid exception in test teardown

Author: Rajini Sivaram <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes #3483 from rajinisivaram/MINOR-consumerbounce-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48c2160c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48c2160c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48c2160c

Branch: refs/heads/trunk
Commit: 48c2160c75e24bae61133eb932a790fdf80e7ca9
Parents: ef5867d
Author: Rajini Sivaram <[email protected]>
Authored: Wed Jul 5 22:05:27 2017 +0100
Committer: Rajini Sivaram <[email protected]>
Committed: Wed Jul 5 22:05:27 2017 +0100

----------------------------------------------------------------------
 .../src/test/scala/integration/kafka/api/ConsumerBounceTest.scala | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/48c2160c/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index a06cc29..2fa4d15 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -76,6 +76,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with 
Logging {
   override def tearDown() {
     try {
       executor.shutdownNow()
+      // Wait for any active tasks to terminate to ensure consumer is not 
closed while being used from another thread
+      assertTrue("Executor did not terminate", executor.awaitTermination(5000, 
TimeUnit.MILLISECONDS))
     } finally {
       super.tearDown()
     }
@@ -339,6 +341,7 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
     waitForRebalance(2000, rebalanceFuture, consumer2)
 
     // Trigger another rebalance and shutdown all brokers
+    // This consumer poll() doesn't complete and `tearDown` shuts down the 
executor and closes the consumer
     createConsumerToRebalance()
     servers.foreach(server => killBroker(server.config.brokerId))
 

Reply via email to