This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6279b03 KAFKA-8118; Ensure ZK clients are closed in tests, fix verification (#6456) 6279b03 is described below commit 6279b03813af58411951a94cfa3509f6ef14bec0 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Mon Mar 18 08:47:28 2019 +0000 KAFKA-8118; Ensure ZK clients are closed in tests, fix verification (#6456) We verify that ZK clients are closed in tests since these can affect subsequent tests and that makes it hard to debug test failures. But because of changes to ZooKeeper client, we were checking the wrong thread name. The thread name used now is <creatorThreadName>-EventThread where creatorThreadName varies depending on the test. Fixed ZooKeeperTestHarness to check this format and fixed tests which were leaving ZK clients behind. Also added a test to make sure we can detect changes to [...] Reviewers: Ismael Juma <ism...@juma.me.uk>, Manikumar Reddy <manikumar.re...@gmail.com> --- .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 4 +-- .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 30 ++++++++++++++-------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index ebb5fb7..5a62464 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -84,7 +84,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { } object ZooKeeperTestHarness { - val ZkClientEventThreadPrefix = "ZkClient-EventThread" + val ZkClientEventThreadSuffix = "-EventThread" // Threads which may cause transient failures in subsequent tests if not shutdown. // These include threads which make connections to brokers and may cause issues @@ -94,7 +94,7 @@ object ZooKeeperTestHarness { KafkaProducer.NETWORK_THREAD_PREFIX, AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(), AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, - ZkClientEventThreadPrefix) + ZkClientEventThreadSuffix) /** * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread. diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala index fd3f59c..e943348 100644 --- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -82,8 +82,16 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { @Test def testConnection(): Unit = { - new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup", - "testMetricType").close() + val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup", + "testMetricType") + try { + // Verify ZooKeeper event thread name. This is used in ZooKeeperTestHarness to verify that tests have closed ZK clients + val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName) + assertTrue(s"ZooKeeperClient event thread not found, threads=$threads", + threads.exists(_.contains(ZooKeeperTestHarness.ZkClientEventThreadSuffix))) + } finally { + client.close() + } } @Test @@ -327,14 +335,15 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { } } - val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, + zooKeeperClient.close() + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup", "testMetricType") - client.registerStateChangeHandler(stateChangeHandler) + zooKeeperClient.registerStateChangeHandler(stateChangeHandler) val requestThread = new Thread() { override def run(): Unit = { try - client.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) finally latch.countDown() @@ -343,7 +352,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { val reinitializeThread = new Thread() { override def run(): Unit = { - client.forceReinitialize() + zooKeeperClient.forceReinitialize() } } @@ -375,12 +384,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness { } } - val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, + zooKeeperClient.close() + zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup", "testMetricType") - client.registerStateChangeHandler(faultyHandler) - client.registerStateChangeHandler(goodHandler) + zooKeeperClient.registerStateChangeHandler(faultyHandler) + zooKeeperClient.registerStateChangeHandler(goodHandler) - client.forceReinitialize() + zooKeeperClient.forceReinitialize() assertEquals(1, goodCalls.get)