This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 99ad0e01a92 [improve][cli] Make pulsar-perf termination more 
responsive by using Thread interrupt status (#24309)
99ad0e01a92 is described below

commit 99ad0e01a92bff75865f15fe14f30f32b61373e3
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 16 05:20:25 2025 +0300

    [improve][cli] Make pulsar-perf termination more responsive by using Thread 
interrupt status (#24309)
    
    ### Motivation
    
    Terminating pulsar-perf with CTRL-C could take a while. There's also a 
similar problem in some test cases that I came across while fixing test 
resource leaks.
    
    ### Modifications
    
    - replace `while(true)` loops with 
`while(!Thread.currentThread().isInterrupted())`
    - properly propagate Thread interrupted information after catching 
exceptions
    - fix issue in tests related to runtime hook resource leaks
      - when the execution completes successfully, remove the shutdown hook and 
run it without waiting for JVM exit
---
 .../proxy/socket/client/PerformanceClient.java     | 19 ++++--
 .../apache/pulsar/testclient/BrokerMonitor.java    |  4 +-
 .../pulsar/testclient/LoadSimulationClient.java    |  8 ++-
 .../testclient/LoadSimulationController.java       |  4 +-
 .../pulsar/testclient/ManagedLedgerWriter.java     | 24 ++++---
 .../apache/pulsar/testclient/PerfClientUtils.java  | 79 ++++++++++++++++++++++
 .../pulsar/testclient/PerformanceConsumer.java     | 38 ++++++++---
 .../pulsar/testclient/PerformanceProducer.java     | 71 +++++++++++++------
 .../pulsar/testclient/PerformanceReader.java       | 11 +--
 .../pulsar/testclient/PerformanceTransaction.java  | 62 ++++++++++++-----
 .../pulsar/testclient/PerformanceProducerTest.java | 16 ++---
 11 files changed, 257 insertions(+), 79 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 4fabf6d2185..4053942c050 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -259,6 +259,9 @@ public class PerformanceClient extends CmdBase {
                     }
                 } catch (Exception e) {
                     log.error("Authentication plugin error: " + 
e.getMessage());
+                    if (PerfClientUtils.hasInterruptedException(e)) {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
 
@@ -272,6 +275,9 @@ public class PerformanceClient extends CmdBase {
                 return;
             } catch (Exception e1) {
                 log.error("Fail in starting client[{}]", e1.getMessage());
+                if (PerfClientUtils.hasInterruptedException(e1)) {
+                    Thread.currentThread().interrupt();
+                }
                 return;
             }
 
@@ -288,7 +294,7 @@ public class PerformanceClient extends CmdBase {
                 long testEndTime = startTime + (long) (this.testTime * 1e9);
                 // Send messages on all topics/producers
                 long totalSent = 0;
-                while (true) {
+                while (!Thread.currentThread().isInterrupted()) {
                     for (String topic : producersMap.keySet()) {
                         if (this.testTime > 0 && System.nanoTime() > 
testEndTime) {
                             log.info("------------- DONE (reached the maximum 
duration: [{} seconds] of production) "
@@ -352,10 +358,11 @@ public class PerformanceClient extends CmdBase {
         histogramLogWriter.outputLogFormatVersion();
         histogramLogWriter.outputLegend();
 
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             try {
                 Thread.sleep(5000);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 break;
             }
 
@@ -399,6 +406,9 @@ public class PerformanceClient extends CmdBase {
             Class clz = classLoader.loadClass(formatterClass);
             return (IMessageFormatter) 
clz.getDeclaredConstructor().newInstance();
         } catch (Exception e) {
+            if (PerfClientUtils.hasInterruptedException(e)) {
+                Thread.currentThread().interrupt();
+            }
             return null;
         }
     }
@@ -408,11 +418,12 @@ public class PerformanceClient extends CmdBase {
         loadArguments();
         PerfClientUtils.printJVMInformation(log);
         long start = System.nanoTime();
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+        Thread shutdownHookThread = PerfClientUtils.addShutdownHook(() -> {
             printAggregatedThroughput(start);
             printAggregatedStats();
-        }));
+        });
         runPerformanceTest();
+        PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
     }
 
     private class Tuple {
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index 6af4925a7c6..b875c92c795 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -472,7 +472,7 @@ public class BrokerMonitor extends CmdBase {
         try {
             final BrokerWatcher brokerWatcher = new BrokerWatcher(zkClient);
             brokerWatcher.updateBrokers(BROKER_ROOT);
-            while (true) {
+            while (!Thread.currentThread().isInterrupted()) {
                 Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS);
                 printGlobalData();
             }
@@ -538,7 +538,7 @@ public class BrokerMonitor extends CmdBase {
 
     private void startBrokerLoadDataStoreMonitor() {
         try {
-            while (true) {
+            while (!Thread.currentThread().isInterrupted()) {
                 Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS);
                 printBrokerLoadDataStore();
             }
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
index 115733d5ecd..1b8b71d8d06 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
@@ -125,7 +125,7 @@ public class LoadSimulationClient extends CmdBase{
         // messages continue to be sent after broker
         // restarts occur.
         private Producer<byte[]> getNewProducer() throws Exception {
-            while (true) {
+            while (!Thread.currentThread().isInterrupted()) {
                 try {
                     return client.newProducer()
                                 .topic(topic)
@@ -136,6 +136,7 @@ public class LoadSimulationClient extends CmdBase{
                     Thread.sleep(10000);
                 }
             }
+            throw new InterruptedException();
         }
 
         private class MutableBoolean {
@@ -151,6 +152,9 @@ public class LoadSimulationClient extends CmdBase{
                     // Unset the well flag in the case of an exception so we 
can
                     // try to get a new Producer.
                     wellnessFlag.value = false;
+                    if (PerfClientUtils.hasInterruptedException(e)) {
+                        Thread.currentThread().interrupt();
+                    }
                     return null;
                 };
                 while (!stop.get() && wellnessFlag.value) {
@@ -345,7 +349,7 @@ public class LoadSimulationClient extends CmdBase{
     public void start() throws Exception {
         final ServerSocket serverSocket = new ServerSocket(port);
 
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             // Technically, two controllers can be connected simultaneously, 
but
             // non-sequential handling of commands
             // has not been tested or considered and is not recommended.
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
index 99f443f26d7..2ba3eac171b 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
@@ -523,7 +523,7 @@ public class LoadSimulationController extends CmdBase{
             // This controller will now stream rate changes from the given ZK.
             // Users wishing to stop this should Ctrl + C and use another
             // Controller to send new commands.
-            while (true) {}
+            Thread.currentThread().join();
         }
     }
 
@@ -677,7 +677,7 @@ public class LoadSimulationController extends CmdBase{
      */
     public void start() throws Exception {
         BufferedReader inReader = new BufferedReader(new 
InputStreamReader(System.in));
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             // Print the very simple prompt.
             System.out.println();
             System.out.print("> ");
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 8913d174742..3d73abd803f 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -69,9 +69,6 @@ import picocli.CommandLine.Spec;
 @Command(name = "managed-ledger", description = "Write directly on 
managed-ledgers")
 public class ManagedLedgerWriter extends CmdBase{
 
-    private static final ExecutorService executor = Executors
-            .newCachedThreadPool(new 
DefaultThreadFactory("pulsar-perf-managed-ledger-exec"));
-
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
     private static final LongAdder totalMessagesSent = new LongAdder();
@@ -220,10 +217,13 @@ public class ManagedLedgerWriter extends CmdBase{
         log.info("Created {} managed ledgers", managedLedgers.size());
 
         long start = System.nanoTime();
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+        ExecutorService executor = Executors
+                .newCachedThreadPool(new 
DefaultThreadFactory("pulsar-perf-managed-ledger-exec"));
+        Thread shutdownHookThread = PerfClientUtils.addShutdownHook(() -> {
+            executor.shutdownNow();
             printAggregatedThroughput(start);
             printAggregatedStats();
-        }));
+        });
 
         Collections.shuffle(managedLedgers);
         AtomicBoolean isDone = new AtomicBoolean();
@@ -274,7 +274,7 @@ public class ManagedLedgerWriter extends CmdBase{
 
                     // Send messages on all topics/producers
                     long totalSent = 0;
-                    while (true) {
+                    while (!Thread.currentThread().isInterrupted()) {
                         for (int j = 0; j < nunManagedLedgersForThisThread; 
j++) {
                             if (this.testTime > 0) {
                                 if (System.nanoTime() > testEndTime) {
@@ -304,7 +304,11 @@ public class ManagedLedgerWriter extends CmdBase{
                         }
                     }
                 } catch (Throwable t) {
-                    log.error("Got error", t);
+                    if (PerfClientUtils.hasInterruptedException(t)) {
+                        Thread.currentThread().interrupt();
+                    } else {
+                        log.error("Got error", t);
+                    }
                 }
             });
         }
@@ -314,10 +318,11 @@ public class ManagedLedgerWriter extends CmdBase{
 
         Histogram reportHistogram = null;
 
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             try {
                 Thread.sleep(10000);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 break;
             }
 
@@ -354,8 +359,9 @@ public class ManagedLedgerWriter extends CmdBase{
         }
 
         factory.shutdown();
-    }
 
+        PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
+    }
 
     public static <T> Map<Integer, List<T>> allocateToThreads(List<T> 
managedLedgers, int numThreads) {
 
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index 1e2f6231c6e..18d738f6ae3 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -39,6 +40,7 @@ import org.slf4j.Logger;
 /**
  * Utility for test clients.
  */
+@Slf4j
 @UtilityClass
 public class PerfClientUtils {
 
@@ -136,4 +138,81 @@ public class PerfClientUtils {
         return pulsarAdminBuilder;
     }
 
+    /**
+     * This is used to register a shutdown hook that will be called when the 
JVM exits.
+     * @param runnable the runnable to run on shutdown
+     * @return the thread that was registered as a shutdown hook
+     */
+    public static Thread addShutdownHook(Runnable runnable) {
+        Thread shutdownHookThread = new Thread(runnable, 
"perf-client-shutdown");
+        Runtime.getRuntime().addShutdownHook(shutdownHookThread);
+        return shutdownHookThread;
+    }
+
+    /**
+     * This is used to remove a previously registered shutdown hook and run it 
immediately.
+     * This is useful at least for tests when there are multiple instances of 
the classes
+     * in the JVM. It will also prevent resource leaks when test code isn't 
relying on the JVM
+     * exit to clean up resources.
+     * @param shutdownHookThread the shutdown hook thread to remove and run
+     * @throws InterruptedException if the thread is interrupted while waiting 
for it to finish
+     */
+    public static void removeAndRunShutdownHook(Thread shutdownHookThread) 
throws InterruptedException {
+        // clear interrupted status and restore later
+        boolean wasInterrupted = Thread.currentThread().interrupted();
+        try {
+            Runtime.getRuntime().removeShutdownHook(shutdownHookThread);
+            shutdownHookThread.start();
+            shutdownHookThread.join();
+        } finally {
+            if (wasInterrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * This is used to close the client so that the interrupted status is 
cleared before
+     * closing the client. This is needed if the thread is already interrupted 
before calling this method.
+     * @param client the client to close
+     */
+    public static void closeClient(PulsarClient client) {
+        if (client == null) {
+            return;
+        }
+        // clear interrupted status so that the client can be shutdown
+        boolean wasInterrupted = Thread.currentThread().interrupted();
+        try {
+            client.close();
+        } catch (PulsarClientException e) {
+            log.error("Failed to close client", e);
+        } finally {
+            if (wasInterrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Check if the throwable or any of its causes is an InterruptedException.
+     *
+     * @param throwable the throwable to check
+     * @return true if the throwable or any of its causes is an 
InterruptedException, false otherwise
+     */
+    public static boolean hasInterruptedException(Throwable throwable) {
+        if (throwable == null) {
+            return false;
+        }
+        if (throwable instanceof InterruptedException) {
+            return true;
+        }
+        Throwable cause = throwable.getCause();
+        while (cause != null) {
+            if (cause instanceof InterruptedException) {
+                return true;
+            }
+            cause = cause.getCause();
+        }
+        return false;
+    }
 }
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 5126eefd9ca..29d7a254911 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -287,6 +287,7 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                     messageReceiveLimiter.acquire();
                 } catch (InterruptedException e){
                     log.error("Got error: ", e);
+                    Thread.currentThread().interrupt();
                 }
                 consumer.acknowledgeAsync(msg.getMessageId(), 
atomicReference.get()).thenRun(() -> {
                     totalMessageAck.increment();
@@ -294,6 +295,9 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                 }).exceptionally(throwable ->{
                     log.error("Ack message {} failed with exception", msg, 
throwable);
                     totalMessageAckFailed.increment();
+                    if (PerfClientUtils.hasInterruptedException(throwable)) {
+                        Thread.currentThread().interrupt();
+                    }
                     return null;
                 });
             } else {
@@ -302,6 +306,10 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                             messageAck.increment();
                         }
                 ).exceptionally(throwable ->{
+                            if 
(PerfClientUtils.hasInterruptedException(throwable)) {
+                                Thread.currentThread().interrupt();
+                                return null;
+                            }
                             log.error("Ack message {} failed with exception", 
msg, throwable);
                             totalMessageAckFailed.increment();
                             return null;
@@ -324,6 +332,10 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                                 numTxnOpSuccess.increment();
                             })
                             .exceptionally(exception -> {
+                                if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                    Thread.currentThread().interrupt();
+                                    return null;
+                                }
                                 log.error("Commit transaction failed with 
exception : ", exception);
                                 totalEndTxnOpFailNum.increment();
                                 return null;
@@ -336,6 +348,10 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                         totalEndTxnOpSuccessNum.increment();
                         numTxnOpSuccess.increment();
                     }).exceptionally(exception -> {
+                        if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                            Thread.currentThread().interrupt();
+                            return null;
+                        }
                         log.error("Abort transaction {} failed with exception",
                                 transaction.getTxnID().toString(),
                                 exception);
@@ -343,7 +359,7 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                         return null;
                     });
                 }
-                while (true) {
+                while (!Thread.currentThread().isInterrupted()) {
                     try {
                         Transaction newTransaction = 
pulsarClient.newTransaction()
                                 
.withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
@@ -354,8 +370,12 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                         
messageReceiveLimiter.release(this.numMessagesPerTransaction);
                         break;
                     } catch (Exception e) {
-                        log.error("Failed to new transaction with exception:", 
e);
-                        totalNumTxnOpenFail.increment();
+                        if (PerfClientUtils.hasInterruptedException(e)) {
+                            Thread.currentThread().interrupt();
+                        } else {
+                            log.error("Failed to new transaction with 
exception:", e);
+                            totalNumTxnOpenFail.increment();
+                        }
                     }
                 }
             }
@@ -408,11 +428,10 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
 
         long start = System.nanoTime();
 
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+        Thread shutdownHookThread = PerfClientUtils.addShutdownHook(() -> {
             printAggregatedThroughput(start);
             printAggregatedStats();
-        }));
-
+        });
 
         long oldTime = System.nanoTime();
 
@@ -432,10 +451,11 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
             histogramLogWriter.outputLegend();
         }
 
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             try {
                 Thread.sleep(10000);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 break;
             }
 
@@ -508,8 +528,8 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                 }
             }
         }
-
-        pulsarClient.close();
+        PerfClientUtils.closeClient(pulsarClient);
+        PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
     }
 
     private void printAggregatedThroughput(long start) {
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index ba5be3a3c45..8860696321a 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,10 +81,6 @@ import picocli.CommandLine.TypeConversionException;
  */
 @Command(name = "produce", description = "Test pulsar producer performance.")
 public class PerformanceProducer extends PerformanceTopicListArguments{
-
-    private static final ExecutorService executor = Executors
-            .newCachedThreadPool(new 
DefaultThreadFactory("pulsar-perf-producer-exec"));
-
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder messagesFailed = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
@@ -287,11 +284,13 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
 
         long start = System.nanoTime();
 
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            executorShutdownNow();
+        ExecutorService executor = Executors
+                .newCachedThreadPool(new 
DefaultThreadFactory("pulsar-perf-producer-exec"));
+        Thread shutdownHookThread = PerfClientUtils.addShutdownHook(() -> {
+            executorShutdownNow(executor);
             printAggregatedThroughput(start);
             printAggregatedStats();
-        }));
+        });
 
         if (this.partitions  != null) {
             final PulsarAdminBuilder adminBuilder = PerfClientUtils
@@ -358,10 +357,11 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
             histogramLogWriter.outputLegend();
         }
 
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             try {
                 Thread.sleep(10000);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 break;
             }
 
@@ -412,12 +412,15 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
 
             oldTime = now;
         }
+
+        PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
     }
+
     public PerformanceProducer() {
         super("produce");
     }
 
-    private static void executorShutdownNow() {
+    private static void executorShutdownNow(ExecutorService executor) {
         executor.shutdownNow();
         try {
             if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
@@ -426,6 +429,7 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
             }
         } catch (InterruptedException e) {
             log.warn("Shutdown of thread pool was interrupted");
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -435,6 +439,9 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
             Class clz = classLoader.loadClass(formatterClass);
             return (IMessageFormatter) 
clz.getDeclaredConstructor().newInstance();
         } catch (Exception e) {
+            if (PerfClientUtils.hasInterruptedException(e)) {
+                Thread.currentThread().interrupt();
+            }
             return null;
         }
     }
@@ -552,7 +559,7 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
             AtomicLong totalSent = new AtomicLong(0);
             AtomicLong numMessageSend = new AtomicLong(0);
             Semaphore numMsgPerTxnLimit = new 
Semaphore(this.numMessagesPerTransaction);
-            while (true) {
+            while (!Thread.currentThread().isInterrupted()) {
                 if (produceEnough) {
                     break;
                 }
@@ -601,6 +608,7 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                                 numMsgPerTxnLimit.acquire();
                             } catch (InterruptedException exception){
                                 log.error("Get exception: ", exception);
+                                Thread.currentThread().interrupt();
                             }
                         }
                         messageBuilder = producer.newMessage(transaction)
@@ -642,7 +650,16 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                     }).exceptionally(ex -> {
                         // Ignore the exception of recorder since a very large 
latencyMicros will lead
                         // ArrayIndexOutOfBoundsException in AbstractHistogram
-                        if (ex.getCause() instanceof 
ArrayIndexOutOfBoundsException) {
+                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof ArrayIndexOutOfBoundsException) {
+                            return null;
+                        }
+                        // Ignore the exception when the producer is closed
+                        if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
+                            return null;
+                        }
+                        if (PerfClientUtils.hasInterruptedException(ex)) {
+                            Thread.currentThread().interrupt();
                             return null;
                         }
                         log.warn("Write message error with exception", ex);
@@ -665,6 +682,10 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                                         numTxnOpSuccess.increment();
                                     })
                                     .exceptionally(exception -> {
+                                        if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                            Thread.currentThread().interrupt();
+                                            return null;
+                                        }
                                         log.error("Commit transaction failed 
with exception : ",
                                                 exception);
                                         totalEndTxnOpFailNum.increment();
@@ -678,6 +699,10 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                                 totalEndTxnOpSuccessNum.increment();
                                 numTxnOpSuccess.increment();
                             }).exceptionally(exception -> {
+                                if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                    Thread.currentThread().interrupt();
+                                    return null;
+                                }
                                 log.error("Abort transaction {} failed with 
exception",
                                         transaction.getTxnID().toString(),
                                         exception);
@@ -685,7 +710,7 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                                 return null;
                             });
                         }
-                        while (true) {
+                        while (!Thread.currentThread().isInterrupted()) {
                             try {
                                 Transaction newTransaction = 
pulsarClient.newTransaction()
                                         
.withTransactionTimeout(this.transactionTimeout,
@@ -696,26 +721,28 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
                                 totalNumTxnOpenTxnSuccess.increment();
                                 break;
                             } catch (Exception e){
-                                totalNumTxnOpenTxnFail.increment();
-                                log.error("Failed to new transaction with 
exception: ", e);
+                                if 
(PerfClientUtils.hasInterruptedException(e)) {
+                                    Thread.currentThread().interrupt();
+                                } else {
+                                    totalNumTxnOpenTxnFail.increment();
+                                    log.error("Failed to new transaction with 
exception: ", e);
+                                }
                             }
                         }
                     }
                 }
             }
         } catch (Throwable t) {
-            log.error("Got error", t);
+            if (PerfClientUtils.hasInterruptedException(t)) {
+                Thread.currentThread().interrupt();
+            } else {
+                log.error("Got error", t);
+            }
         } finally {
             if (!produceEnough) {
                 doneLatch.countDown();
             }
-            if (null != client) {
-                try {
-                    client.close();
-                } catch (PulsarClientException e) {
-                    log.error("Failed to close test client", e);
-                }
-            }
+            PerfClientUtils.closeClient(client);
         }
     }
 
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 3c6940b262f..d4170509a6e 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.testclient;
 
+import static org.apache.pulsar.testclient.PerfClientUtils.addShutdownHook;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.util.concurrent.RateLimiter;
@@ -163,10 +164,10 @@ public class PerformanceReader extends 
PerformanceTopicListArguments {
         log.info("Start reading from {} topics", this.numTopics);
 
         final long start = System.nanoTime();
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+        Thread shutdownHookThread = addShutdownHook(() -> {
             printAggregatedThroughput(start);
             printAggregatedStats();
-        }));
+        });
 
         if (this.testTime > 0) {
             TimerTask timoutTask = new TimerTask() {
@@ -184,10 +185,11 @@ public class PerformanceReader extends 
PerformanceTopicListArguments {
         long oldTime = System.nanoTime();
         Histogram reportHistogram = null;
 
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             try {
                 Thread.sleep(10000);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 break;
             }
 
@@ -211,7 +213,8 @@ public class PerformanceReader extends 
PerformanceTopicListArguments {
             oldTime = now;
         }
 
-        pulsarClient.close();
+        PerfClientUtils.closeClient(pulsarClient);
+        PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
     }
     private static void printAggregatedThroughput(long start) {
         double elapsed = (System.nanoTime() - start) / 1e9;
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 943cfaf4510..27f156eb033 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.testclient;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.pulsar.testclient.PerfClientUtils.addShutdownHook;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.util.concurrent.RateLimiter;
@@ -212,7 +213,8 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
         ClientBuilder clientBuilder = 
PerfClientUtils.createClientBuilderFromArguments(this)
                 .enableTransaction(!this.isDisableTransaction);
 
-        try (PulsarClient client = clientBuilder.build()) {
+        PulsarClient client = clientBuilder.build();
+        try {
 
             ExecutorService executorService = new 
ThreadPoolExecutor(this.numTestThreads,
                     this.numTestThreads,
@@ -222,14 +224,14 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
 
             long startTime = System.nanoTime();
             long testEndTime = startTime + (long) (this.testTime * 1e9);
-            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            Thread shutdownHookThread = addShutdownHook(() -> {
                 if (!this.isDisableTransaction) {
                     printTxnAggregatedThroughput(startTime);
                 } else {
                     printAggregatedThroughput(startTime);
                 }
                 printAggregatedStats();
-            }));
+            });
 
             // start perf test
             AtomicBoolean executing = new AtomicBoolean(true);
@@ -257,13 +259,17 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                             atomicReference = new AtomicReference<>(null);
                         }
                     } catch (Exception e) {
-                        log.error("Failed to build Producer/Consumer with 
exception : ", e);
+                        if (PerfClientUtils.hasInterruptedException(e)) {
+                            Thread.currentThread().interrupt();
+                        } else {
+                            log.error("Failed to build Producer/Consumer with 
exception : ", e);
+                        }
                         executorService.shutdownNow();
                         PerfClientUtils.exit(1);
                     }
                     //The while loop has no break, and finally ends the 
execution through the shutdownNow of
                     //the executorService
-                    while (true) {
+                    while (!Thread.currentThread().isInterrupted()) {
                         if (this.numTransactions > 0) {
                             if (totalNumTxnOpenTxnFail.sum()
                                     + totalNumTxnOpenTxnSuccess.sum() >= 
this.numTransactions) {
@@ -309,7 +315,8 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                                                     
messageAckCumulativeRecorder.recordValue(latencyMicros);
                                                     
numMessagesAckSuccess.increment();
                                                 }).exceptionally(exception -> {
-                                                    if (exception instanceof 
InterruptedException && !executing.get()) {
+                                                    if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                                        
Thread.currentThread().interrupt();
                                                         return null;
                                                     }
                                                     log.error(
@@ -326,7 +333,8 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                                             
messageAckCumulativeRecorder.recordValue(latencyMicros);
                                             numMessagesAckSuccess.increment();
                                         }).exceptionally(exception -> {
-                                            if (exception instanceof 
InterruptedException && !executing.get()) {
+                                            if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                                
Thread.currentThread().interrupt();
                                                 return null;
                                             }
                                             log.error(
@@ -352,7 +360,13 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                                                 
messageSendRCumulativeRecorder.recordValue(latencyMicros);
                                                 
numMessagesSendSuccess.increment();
                                             }).exceptionally(exception -> {
-                                                if (exception instanceof 
InterruptedException && !executing.get()) {
+                                                if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                                    
Thread.currentThread().interrupt();
+                                                    return null;
+                                                }
+                                                // Ignore the exception when 
the producer is closed
+                                                if (exception.getCause()
+                                                        instanceof 
PulsarClientException.AlreadyClosedException) {
                                                     return null;
                                                 }
                                                 log.error("Send transaction 
message failed with exception : ",
@@ -369,7 +383,13 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                                                 
messageSendRCumulativeRecorder.recordValue(latencyMicros);
                                                 
numMessagesSendSuccess.increment();
                                             }).exceptionally(exception -> {
-                                                if (exception instanceof 
InterruptedException && !executing.get()) {
+                                                if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                                    
Thread.currentThread().interrupt();
+                                                    return null;
+                                                }
+                                                // Ignore the exception when 
the producer is closed
+                                                if (exception.getCause()
+                                                        instanceof 
PulsarClientException.AlreadyClosedException) {
                                                     return null;
                                                 }
                                                 log.error("Send message failed 
with exception : ", exception);
@@ -390,7 +410,8 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                                             numTxnOpSuccess.increment();
                                             
totalNumEndTxnOpSuccess.increment();
                                         }).exceptionally(exception -> {
-                                            if (exception instanceof 
InterruptedException && !executing.get()) {
+                                            if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                                
Thread.currentThread().interrupt();
                                                 return null;
                                             }
                                             log.error("Commit transaction {} 
failed with exception",
@@ -404,7 +425,8 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                                     numTxnOpSuccess.increment();
                                     totalNumEndTxnOpSuccess.increment();
                                 }).exceptionally(exception -> {
-                                    if (exception instanceof 
InterruptedException && !executing.get()) {
+                                    if 
(PerfClientUtils.hasInterruptedException(exception)) {
+                                        Thread.currentThread().interrupt();
                                         return null;
                                     }
                                     log.error("Commit transaction {} failed 
with exception",
@@ -414,7 +436,7 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                                     return null;
                                 });
                             }
-                            while (true) {
+                            while (!Thread.currentThread().isInterrupted()) {
                                 try {
                                     Transaction newTransaction = 
client.newTransaction()
                                             
.withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
@@ -424,11 +446,12 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
                                     totalNumTxnOpenTxnSuccess.increment();
                                     break;
                                 } catch (Exception throwable) {
-                                    if (throwable instanceof 
InterruptedException && !executing.get()) {
-                                        break;
+                                    if 
(PerfClientUtils.hasInterruptedException(throwable)) {
+                                        Thread.currentThread().interrupt();
+                                    } else {
+                                        log.error("Failed to new transaction 
with exception: ", throwable);
+                                        totalNumTxnOpenTxnFail.increment();
                                     }
-                                    log.error("Failed to new transaction with 
exception: ", throwable);
-                                    totalNumTxnOpenTxnFail.increment();
                                 }
                             }
                         } else {
@@ -457,10 +480,11 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
             histogramLogWriter.outputLogFormatVersion();
             histogramLogWriter.outputLegend();
 
-            while (executing.get()) {
+            while (!Thread.currentThread().isInterrupted() && executing.get()) 
{
                 try {
                     Thread.sleep(10000);
                 } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                     break;
                 }
                 long now = System.nanoTime();
@@ -502,6 +526,10 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
 
                 oldTime = now;
             }
+
+            PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
+        } finally {
+            PerfClientUtils.closeClient(client);
         }
     }
 
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index 519bed6cdb5..c7f2077bc5d 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -217,20 +217,20 @@ public class PerformanceProducerTest extends 
MockedPulsarServiceBaseTest {
         String topic = testTopic + UUID.randomUUID().toString();
         String args = String.format(argString, topic, 
pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress());
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub")
-                .subscriptionType(SubscriptionType.Key_Shared).subscribe();
-        new Thread(() -> {
+                .subscriptionType(SubscriptionType.Shared).subscribe();
+        Thread thread = new Thread(() -> {
             try {
                 PerformanceProducer producer = new PerformanceProducer();
                 producer.run(args.split(" "));
             } catch (Exception e) {
                 log.error("Failed to start perf producer");
             }
-        }).start();
-        Awaitility.await()
-                .untilAsserted(() -> {
-                    Message<byte[]> message = consumer.receive(3, 
TimeUnit.SECONDS);
-                    assertNotNull(message);
-                });
+        });
+        thread.start();
+        Message<byte[]> message = consumer.receive(15, TimeUnit.SECONDS);
+        assertNotNull(message);
+        thread.interrupt();
+        thread.join();
         consumer.close();
     }
 


Reply via email to