This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6279b03  KAFKA-8118; Ensure ZK clients are closed in tests, fix 
verification (#6456)
6279b03 is described below

commit 6279b03813af58411951a94cfa3509f6ef14bec0
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Mon Mar 18 08:47:28 2019 +0000

    KAFKA-8118; Ensure ZK clients are closed in tests, fix verification (#6456)
    
    We verify that ZK clients are closed in tests since these can affect 
subsequent tests and that makes it hard to debug test failures. But because of 
changes to ZooKeeper client, we were checking the wrong thread name. The thread 
name used now is <creatorThreadName>-EventThread where creatorThreadName varies 
depending on the test. Fixed ZooKeeperTestHarness to check this format and 
fixed tests which were leaving ZK clients behind. Also added a test to make 
sure we can detect changes to  [...]
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>, Manikumar Reddy 
<manikumar.re...@gmail.com>
---
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  4 +--
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 30 ++++++++++++++--------
 2 files changed, 22 insertions(+), 12 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala 
b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index ebb5fb7..5a62464 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -84,7 +84,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with 
Logging {
 }
 
 object ZooKeeperTestHarness {
-  val ZkClientEventThreadPrefix = "ZkClient-EventThread"
+  val ZkClientEventThreadSuffix = "-EventThread"
 
   // Threads which may cause transient failures in subsequent tests if not 
shutdown.
   // These include threads which make connections to brokers and may cause 
issues
@@ -94,7 +94,7 @@ object ZooKeeperTestHarness {
                                   KafkaProducer.NETWORK_THREAD_PREFIX,
                                   
AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
                                   AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
-                                  ZkClientEventThreadPrefix)
+                                  ZkClientEventThreadSuffix)
 
   /**
    * Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't 
left behind an unexpected thread.
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala 
b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index fd3f59c..e943348 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -82,8 +82,16 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testConnection(): Unit = {
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
Int.MaxValue, time, "testMetricGroup",
-      "testMetricType").close()
+    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
+      "testMetricType")
+    try {
+      // Verify ZooKeeper event thread name. This is used in 
ZooKeeperTestHarness to verify that tests have closed ZK clients
+      val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
+      assertTrue(s"ZooKeeperClient event thread not found, threads=$threads",
+        
threads.exists(_.contains(ZooKeeperTestHarness.ZkClientEventThreadSuffix)))
+    } finally {
+      client.close()
+    }
   }
 
   @Test
@@ -327,14 +335,15 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       }
     }
 
-    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, Int.MaxValue, time,
+    zooKeeperClient.close()
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, Int.MaxValue, time,
       "testMetricGroup", "testMetricType")
-    client.registerStateChangeHandler(stateChangeHandler)
+    zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
 
     val requestThread = new Thread() {
       override def run(): Unit = {
         try
-          client.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
+          zooKeeperClient.handleRequest(CreateRequest(mockPath, 
Array.empty[Byte],
             ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
         finally
           latch.countDown()
@@ -343,7 +352,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
     val reinitializeThread = new Thread() {
       override def run(): Unit = {
-        client.forceReinitialize()
+        zooKeeperClient.forceReinitialize()
       }
     }
 
@@ -375,12 +384,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       }
     }
 
-    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, Int.MaxValue, time,
+    zooKeeperClient.close()
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, Int.MaxValue, time,
       "testMetricGroup", "testMetricType")
-    client.registerStateChangeHandler(faultyHandler)
-    client.registerStateChangeHandler(goodHandler)
+    zooKeeperClient.registerStateChangeHandler(faultyHandler)
+    zooKeeperClient.registerStateChangeHandler(goodHandler)
 
-    client.forceReinitialize()
+    zooKeeperClient.forceReinitialize()
 
     assertEquals(1, goodCalls.get)
 

Reply via email to