Repository: kafka Updated Branches: refs/heads/trunk ef5867dce -> 48c2160c7
MINOR: Wait for tasks to terminate to avoid exception in test teardown Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3483 from rajinisivaram/MINOR-consumerbounce-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48c2160c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48c2160c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48c2160c Branch: refs/heads/trunk Commit: 48c2160c75e24bae61133eb932a790fdf80e7ca9 Parents: ef5867d Author: Rajini Sivaram <[email protected]> Authored: Wed Jul 5 22:05:27 2017 +0100 Committer: Rajini Sivaram <[email protected]> Committed: Wed Jul 5 22:05:27 2017 +0100 ---------------------------------------------------------------------- .../src/test/scala/integration/kafka/api/ConsumerBounceTest.scala | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/48c2160c/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index a06cc29..2fa4d15 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -76,6 +76,8 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { override def tearDown() { try { executor.shutdownNow() + // Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread + assertTrue("Executor did not terminate", executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) } finally { super.tearDown() } @@ -339,6 +341,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { waitForRebalance(2000, rebalanceFuture, consumer2) // Trigger another rebalance and shutdown all brokers + // This consumer poll() doesn't complete and `tearDown` shuts down the executor and closes the consumer createConsumerToRebalance() servers.foreach(server => killBroker(server.config.brokerId))
