heesung-sn commented on code in PR #24162:
URL: https://github.com/apache/pulsar/pull/24162#discussion_r2033585505


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java:
##########
@@ -95,9 +97,5 @@ public void 
testFetchPartitionedTopicMetadataWithCacheRefresh() throws Exception
         PartitionedTopicMetadata partitionedTopicMetadata3 =
                 
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName2, 
true).get();
         assertEquals(partitionedTopicMetadata3.partitions, 3);
-
-        // cleanup.
-        admin.topics().deletePartitionedTopic(topicName2.toString());

Review Comment:
   can we skip this topic cleanup?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java:
##########
@@ -95,54 +97,66 @@ protected void startBrokers() throws Exception {
         client = PulsarClient.builder().serviceUrl(url.toString()).build();
     }
 
-    protected void makeLocalMetadataStoreKeepReconnect() throws Exception {
-        if (!LocalMetadataStoreInReconnectFinishSignal.compareAndSet(false, 
true)) {
-            throw new RuntimeException("Local metadata store is already 
keeping reconnect");
+    protected void startLocalMetadataStoreConnectionTermination() throws 
Exception {
+        if (!connectionTerminationThreadKeepRunning.compareAndSet(false, 
true)) {
+            throw new RuntimeException("Local metadata store connection is 
already being terminated");
         }
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if 
(localMetaDataStoreClientCnx.getClass().getSimpleName().equals("ClientCnxnSocketNIO"))
 {
-            makeLocalMetadataStoreKeepReconnectNIO();
+            startNIOImplTermination(future);
         } else {
             // ClientCnxnSocketNetty.
-            makeLocalMetadataStoreKeepReconnectNetty();
+            startNettyImplTermination(future);
         }
+        // wait until connection is closed at least once
+        future.get();
     }
 
-    protected void makeLocalMetadataStoreKeepReconnectNIO() {
-        new Thread(() -> {
-            while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+    private void startNIOImplTermination(CompletableFuture<Void> future) {
+        connectionTerminationThread = new Thread(() -> {
+            while (connectionTerminationThreadKeepRunning.get()) {
                 try {
                     SelectionKey sockKey = 
WhiteboxImpl.getInternalState(localMetaDataStoreClientCnx, "sockKey");
                     if (sockKey != null) {
                         sockKey.channel().close();
+                        future.complete(null);
                     }
                     // Prevents high cpu usage.
                     Thread.sleep(5);
                 } catch (Exception e) {
                     log.error("Try close the ZK connection of local metadata 
store failed: {}", e.toString());
                 }
             }
-        }).start();
+        });
+        connectionTerminationThread.start();
     }
 
-    protected void makeLocalMetadataStoreKeepReconnectNetty() {
-        new Thread(() -> {
-            while (LocalMetadataStoreInReconnectFinishSignal.get()) {
+    private void startNettyImplTermination(CompletableFuture<Void> future) {
+        connectionTerminationThread = new Thread(() -> {

Review Comment:
   are we safe to reset this connectionTerminationThread with a new thread?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to