This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e3ed8fcf343 [improve][cli] Make pulsar-perf termination more
responsive by using Thread interrupt status (#24309)
e3ed8fcf343 is described below
commit e3ed8fcf343eb595cb32de0620123d54cb6f8334
Author: Lari Hotari <[email protected]>
AuthorDate: Fri May 16 05:20:25 2025 +0300
[improve][cli] Make pulsar-perf termination more responsive by using Thread
interrupt status (#24309)
### Motivation
Terminating pulsar-perf with CTRL-C could take a while. There's also a
similar problem in some test cases that I came across while fixing test
resource leaks.
### Modifications
- replace `while(true)` loops with
`while(!Thread.currentThread().isInterrupted())`
- properly propagate Thread interrupted information after catching
exceptions
- fix issue in tests related to runtime hook resource leaks
- when the execution completes successfully, remove the shutdown hook and
run it without waiting for JVM exit
(cherry picked from commit 99ad0e01a92bff75865f15fe14f30f32b61373e3)
---
.../proxy/socket/client/PerformanceClient.java | 19 ++++--
.../apache/pulsar/testclient/BrokerMonitor.java | 4 +-
.../pulsar/testclient/LoadSimulationClient.java | 8 ++-
.../testclient/LoadSimulationController.java | 4 +-
.../pulsar/testclient/ManagedLedgerWriter.java | 24 ++++---
.../apache/pulsar/testclient/PerfClientUtils.java | 79 ++++++++++++++++++++++
.../pulsar/testclient/PerformanceConsumer.java | 38 ++++++++---
.../pulsar/testclient/PerformanceProducer.java | 71 +++++++++++++------
.../pulsar/testclient/PerformanceReader.java | 11 +--
.../pulsar/testclient/PerformanceTransaction.java | 62 ++++++++++++-----
.../pulsar/testclient/PerformanceProducerTest.java | 16 ++---
11 files changed, 257 insertions(+), 79 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 4fabf6d2185..4053942c050 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -259,6 +259,9 @@ public class PerformanceClient extends CmdBase {
}
} catch (Exception e) {
log.error("Authentication plugin error: " +
e.getMessage());
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ }
}
}
@@ -272,6 +275,9 @@ public class PerformanceClient extends CmdBase {
return;
} catch (Exception e1) {
log.error("Fail in starting client[{}]", e1.getMessage());
+ if (PerfClientUtils.hasInterruptedException(e1)) {
+ Thread.currentThread().interrupt();
+ }
return;
}
@@ -288,7 +294,7 @@ public class PerformanceClient extends CmdBase {
long testEndTime = startTime + (long) (this.testTime * 1e9);
// Send messages on all topics/producers
long totalSent = 0;
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
for (String topic : producersMap.keySet()) {
if (this.testTime > 0 && System.nanoTime() >
testEndTime) {
log.info("------------- DONE (reached the maximum
duration: [{} seconds] of production) "
@@ -352,10 +358,11 @@ public class PerformanceClient extends CmdBase {
histogramLogWriter.outputLogFormatVersion();
histogramLogWriter.outputLegend();
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
@@ -399,6 +406,9 @@ public class PerformanceClient extends CmdBase {
Class clz = classLoader.loadClass(formatterClass);
return (IMessageFormatter)
clz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ }
return null;
}
}
@@ -408,11 +418,12 @@ public class PerformanceClient extends CmdBase {
loadArguments();
PerfClientUtils.printJVMInformation(log);
long start = System.nanoTime();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ Thread shutdownHookThread = PerfClientUtils.addShutdownHook(() -> {
printAggregatedThroughput(start);
printAggregatedStats();
- }));
+ });
runPerformanceTest();
+ PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
}
private class Tuple {
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index 6af4925a7c6..b875c92c795 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -472,7 +472,7 @@ public class BrokerMonitor extends CmdBase {
try {
final BrokerWatcher brokerWatcher = new BrokerWatcher(zkClient);
brokerWatcher.updateBrokers(BROKER_ROOT);
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS);
printGlobalData();
}
@@ -538,7 +538,7 @@ public class BrokerMonitor extends CmdBase {
private void startBrokerLoadDataStoreMonitor() {
try {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS);
printBrokerLoadDataStore();
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
index 115733d5ecd..1b8b71d8d06 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
@@ -125,7 +125,7 @@ public class LoadSimulationClient extends CmdBase{
// messages continue to be sent after broker
// restarts occur.
private Producer<byte[]> getNewProducer() throws Exception {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
return client.newProducer()
.topic(topic)
@@ -136,6 +136,7 @@ public class LoadSimulationClient extends CmdBase{
Thread.sleep(10000);
}
}
+ throw new InterruptedException();
}
private class MutableBoolean {
@@ -151,6 +152,9 @@ public class LoadSimulationClient extends CmdBase{
// Unset the well flag in the case of an exception so we
can
// try to get a new Producer.
wellnessFlag.value = false;
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ }
return null;
};
while (!stop.get() && wellnessFlag.value) {
@@ -345,7 +349,7 @@ public class LoadSimulationClient extends CmdBase{
public void start() throws Exception {
final ServerSocket serverSocket = new ServerSocket(port);
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
// Technically, two controllers can be connected simultaneously,
but
// non-sequential handling of commands
// has not been tested or considered and is not recommended.
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
index 99f443f26d7..2ba3eac171b 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
@@ -523,7 +523,7 @@ public class LoadSimulationController extends CmdBase{
// This controller will now stream rate changes from the given ZK.
// Users wishing to stop this should Ctrl + C and use another
// Controller to send new commands.
- while (true) {}
+ Thread.currentThread().join();
}
}
@@ -677,7 +677,7 @@ public class LoadSimulationController extends CmdBase{
*/
public void start() throws Exception {
BufferedReader inReader = new BufferedReader(new
InputStreamReader(System.in));
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
// Print the very simple prompt.
System.out.println();
System.out.print("> ");
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 8913d174742..3d73abd803f 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -69,9 +69,6 @@ import picocli.CommandLine.Spec;
@Command(name = "managed-ledger", description = "Write directly on
managed-ledgers")
public class ManagedLedgerWriter extends CmdBase{
- private static final ExecutorService executor = Executors
- .newCachedThreadPool(new
DefaultThreadFactory("pulsar-perf-managed-ledger-exec"));
-
private static final LongAdder messagesSent = new LongAdder();
private static final LongAdder bytesSent = new LongAdder();
private static final LongAdder totalMessagesSent = new LongAdder();
@@ -220,10 +217,13 @@ public class ManagedLedgerWriter extends CmdBase{
log.info("Created {} managed ledgers", managedLedgers.size());
long start = System.nanoTime();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ ExecutorService executor = Executors
+ .newCachedThreadPool(new
DefaultThreadFactory("pulsar-perf-managed-ledger-exec"));
+ Thread shutdownHookThread = PerfClientUtils.addShutdownHook(() -> {
+ executor.shutdownNow();
printAggregatedThroughput(start);
printAggregatedStats();
- }));
+ });
Collections.shuffle(managedLedgers);
AtomicBoolean isDone = new AtomicBoolean();
@@ -274,7 +274,7 @@ public class ManagedLedgerWriter extends CmdBase{
// Send messages on all topics/producers
long totalSent = 0;
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
for (int j = 0; j < nunManagedLedgersForThisThread;
j++) {
if (this.testTime > 0) {
if (System.nanoTime() > testEndTime) {
@@ -304,7 +304,11 @@ public class ManagedLedgerWriter extends CmdBase{
}
}
} catch (Throwable t) {
- log.error("Got error", t);
+ if (PerfClientUtils.hasInterruptedException(t)) {
+ Thread.currentThread().interrupt();
+ } else {
+ log.error("Got error", t);
+ }
}
});
}
@@ -314,10 +318,11 @@ public class ManagedLedgerWriter extends CmdBase{
Histogram reportHistogram = null;
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
@@ -354,8 +359,9 @@ public class ManagedLedgerWriter extends CmdBase{
}
factory.shutdown();
- }
+ PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
+ }
public static <T> Map<Integer, List<T>> allocateToThreads(List<T>
managedLedgers, int numThreads) {
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index 1e2f6231c6e..18d738f6ae3 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -26,6 +26,7 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -39,6 +40,7 @@ import org.slf4j.Logger;
/**
* Utility for test clients.
*/
+@Slf4j
@UtilityClass
public class PerfClientUtils {
@@ -136,4 +138,81 @@ public class PerfClientUtils {
return pulsarAdminBuilder;
}
+ /**
+ * This is used to register a shutdown hook that will be called when the
JVM exits.
+ * @param runnable the runnable to run on shutdown
+ * @return the thread that was registered as a shutdown hook
+ */
+ public static Thread addShutdownHook(Runnable runnable) {
+ Thread shutdownHookThread = new Thread(runnable,
"perf-client-shutdown");
+ Runtime.getRuntime().addShutdownHook(shutdownHookThread);
+ return shutdownHookThread;
+ }
+
+ /**
+ * This is used to remove a previously registered shutdown hook and run it
immediately.
+ * This is useful at least for tests when there are multiple instances of
the classes
+ * in the JVM. It will also prevent resource leaks when test code isn't
relying on the JVM
+ * exit to clean up resources.
+ * @param shutdownHookThread the shutdown hook thread to remove and run
+ * @throws InterruptedException if the thread is interrupted while waiting
for it to finish
+ */
+ public static void removeAndRunShutdownHook(Thread shutdownHookThread)
throws InterruptedException {
+ // clear interrupted status and restore later
+ boolean wasInterrupted = Thread.currentThread().interrupted();
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHookThread);
+ shutdownHookThread.start();
+ shutdownHookThread.join();
+ } finally {
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * This is used to close the client so that the interrupted status is
cleared before
+ * closing the client. This is needed if the thread is already interrupted
before calling this method.
+ * @param client the client to close
+ */
+ public static void closeClient(PulsarClient client) {
+ if (client == null) {
+ return;
+ }
+ // clear interrupted status so that the client can be shutdown
+ boolean wasInterrupted = Thread.currentThread().interrupted();
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.error("Failed to close client", e);
+ } finally {
+ if (wasInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Check if the throwable or any of its causes is an InterruptedException.
+ *
+ * @param throwable the throwable to check
+ * @return true if the throwable or any of its causes is an
InterruptedException, false otherwise
+ */
+ public static boolean hasInterruptedException(Throwable throwable) {
+ if (throwable == null) {
+ return false;
+ }
+ if (throwable instanceof InterruptedException) {
+ return true;
+ }
+ Throwable cause = throwable.getCause();
+ while (cause != null) {
+ if (cause instanceof InterruptedException) {
+ return true;
+ }
+ cause = cause.getCause();
+ }
+ return false;
+ }
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 5126eefd9ca..29d7a254911 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -287,6 +287,7 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
messageReceiveLimiter.acquire();
} catch (InterruptedException e){
log.error("Got error: ", e);
+ Thread.currentThread().interrupt();
}
consumer.acknowledgeAsync(msg.getMessageId(),
atomicReference.get()).thenRun(() -> {
totalMessageAck.increment();
@@ -294,6 +295,9 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
}).exceptionally(throwable ->{
log.error("Ack message {} failed with exception", msg,
throwable);
totalMessageAckFailed.increment();
+ if (PerfClientUtils.hasInterruptedException(throwable)) {
+ Thread.currentThread().interrupt();
+ }
return null;
});
} else {
@@ -302,6 +306,10 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
messageAck.increment();
}
).exceptionally(throwable ->{
+ if
(PerfClientUtils.hasInterruptedException(throwable)) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
log.error("Ack message {} failed with exception",
msg, throwable);
totalMessageAckFailed.increment();
return null;
@@ -324,6 +332,10 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
numTxnOpSuccess.increment();
})
.exceptionally(exception -> {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
log.error("Commit transaction failed with
exception : ", exception);
totalEndTxnOpFailNum.increment();
return null;
@@ -336,6 +348,10 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
totalEndTxnOpSuccessNum.increment();
numTxnOpSuccess.increment();
}).exceptionally(exception -> {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
log.error("Abort transaction {} failed with exception",
transaction.getTxnID().toString(),
exception);
@@ -343,7 +359,7 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
return null;
});
}
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Transaction newTransaction =
pulsarClient.newTransaction()
.withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
@@ -354,8 +370,12 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
messageReceiveLimiter.release(this.numMessagesPerTransaction);
break;
} catch (Exception e) {
- log.error("Failed to new transaction with exception:",
e);
- totalNumTxnOpenFail.increment();
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ } else {
+ log.error("Failed to new transaction with
exception:", e);
+ totalNumTxnOpenFail.increment();
+ }
}
}
}
@@ -408,11 +428,10 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
long start = System.nanoTime();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ Thread shutdownHookThread = PerfClientUtils.addShutdownHook(() -> {
printAggregatedThroughput(start);
printAggregatedStats();
- }));
-
+ });
long oldTime = System.nanoTime();
@@ -432,10 +451,11 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
histogramLogWriter.outputLegend();
}
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
@@ -508,8 +528,8 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
}
}
}
-
- pulsarClient.close();
+ PerfClientUtils.closeClient(pulsarClient);
+ PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
}
private void printAggregatedThroughput(long start) {
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index ba5be3a3c45..8860696321a 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,10 +81,6 @@ import picocli.CommandLine.TypeConversionException;
*/
@Command(name = "produce", description = "Test pulsar producer performance.")
public class PerformanceProducer extends PerformanceTopicListArguments{
-
- private static final ExecutorService executor = Executors
- .newCachedThreadPool(new
DefaultThreadFactory("pulsar-perf-producer-exec"));
-
private static final LongAdder messagesSent = new LongAdder();
private static final LongAdder messagesFailed = new LongAdder();
private static final LongAdder bytesSent = new LongAdder();
@@ -287,11 +284,13 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
long start = System.nanoTime();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- executorShutdownNow();
+ ExecutorService executor = Executors
+ .newCachedThreadPool(new
DefaultThreadFactory("pulsar-perf-producer-exec"));
+ Thread shutdownHookThread = PerfClientUtils.addShutdownHook(() -> {
+ executorShutdownNow(executor);
printAggregatedThroughput(start);
printAggregatedStats();
- }));
+ });
if (this.partitions != null) {
final PulsarAdminBuilder adminBuilder = PerfClientUtils
@@ -358,10 +357,11 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
histogramLogWriter.outputLegend();
}
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
@@ -412,12 +412,15 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
oldTime = now;
}
+
+ PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
}
+
public PerformanceProducer() {
super("produce");
}
- private static void executorShutdownNow() {
+ private static void executorShutdownNow(ExecutorService executor) {
executor.shutdownNow();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
@@ -426,6 +429,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
}
} catch (InterruptedException e) {
log.warn("Shutdown of thread pool was interrupted");
+ Thread.currentThread().interrupt();
}
}
@@ -435,6 +439,9 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
Class clz = classLoader.loadClass(formatterClass);
return (IMessageFormatter)
clz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ }
return null;
}
}
@@ -552,7 +559,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
AtomicLong totalSent = new AtomicLong(0);
AtomicLong numMessageSend = new AtomicLong(0);
Semaphore numMsgPerTxnLimit = new
Semaphore(this.numMessagesPerTransaction);
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
if (produceEnough) {
break;
}
@@ -601,6 +608,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
numMsgPerTxnLimit.acquire();
} catch (InterruptedException exception){
log.error("Get exception: ", exception);
+ Thread.currentThread().interrupt();
}
}
messageBuilder = producer.newMessage(transaction)
@@ -642,7 +650,16 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
}).exceptionally(ex -> {
// Ignore the exception of recorder since a very large
latencyMicros will lead
// ArrayIndexOutOfBoundsException in AbstractHistogram
- if (ex.getCause() instanceof
ArrayIndexOutOfBoundsException) {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof ArrayIndexOutOfBoundsException) {
+ return null;
+ }
+ // Ignore the exception when the producer is closed
+ if (cause instanceof
PulsarClientException.AlreadyClosedException) {
+ return null;
+ }
+ if (PerfClientUtils.hasInterruptedException(ex)) {
+ Thread.currentThread().interrupt();
return null;
}
log.warn("Write message error with exception", ex);
@@ -665,6 +682,10 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
numTxnOpSuccess.increment();
})
.exceptionally(exception -> {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
log.error("Commit transaction failed
with exception : ",
exception);
totalEndTxnOpFailNum.increment();
@@ -678,6 +699,10 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
totalEndTxnOpSuccessNum.increment();
numTxnOpSuccess.increment();
}).exceptionally(exception -> {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
log.error("Abort transaction {} failed with
exception",
transaction.getTxnID().toString(),
exception);
@@ -685,7 +710,7 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
return null;
});
}
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Transaction newTransaction =
pulsarClient.newTransaction()
.withTransactionTimeout(this.transactionTimeout,
@@ -696,26 +721,28 @@ public class PerformanceProducer extends
PerformanceTopicListArguments{
totalNumTxnOpenTxnSuccess.increment();
break;
} catch (Exception e){
- totalNumTxnOpenTxnFail.increment();
- log.error("Failed to new transaction with
exception: ", e);
+ if
(PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ } else {
+ totalNumTxnOpenTxnFail.increment();
+ log.error("Failed to new transaction with
exception: ", e);
+ }
}
}
}
}
}
} catch (Throwable t) {
- log.error("Got error", t);
+ if (PerfClientUtils.hasInterruptedException(t)) {
+ Thread.currentThread().interrupt();
+ } else {
+ log.error("Got error", t);
+ }
} finally {
if (!produceEnough) {
doneLatch.countDown();
}
- if (null != client) {
- try {
- client.close();
- } catch (PulsarClientException e) {
- log.error("Failed to close test client", e);
- }
- }
+ PerfClientUtils.closeClient(client);
}
}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 3c6940b262f..d4170509a6e 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.testclient;
+import static org.apache.pulsar.testclient.PerfClientUtils.addShutdownHook;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
@@ -163,10 +164,10 @@ public class PerformanceReader extends
PerformanceTopicListArguments {
log.info("Start reading from {} topics", this.numTopics);
final long start = System.nanoTime();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ Thread shutdownHookThread = addShutdownHook(() -> {
printAggregatedThroughput(start);
printAggregatedStats();
- }));
+ });
if (this.testTime > 0) {
TimerTask timoutTask = new TimerTask() {
@@ -184,10 +185,11 @@ public class PerformanceReader extends
PerformanceTopicListArguments {
long oldTime = System.nanoTime();
Histogram reportHistogram = null;
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
@@ -211,7 +213,8 @@ public class PerformanceReader extends
PerformanceTopicListArguments {
oldTime = now;
}
- pulsarClient.close();
+ PerfClientUtils.closeClient(pulsarClient);
+ PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
}
private static void printAggregatedThroughput(long start) {
double elapsed = (System.nanoTime() - start) / 1e9;
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 943cfaf4510..27f156eb033 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.testclient;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.pulsar.testclient.PerfClientUtils.addShutdownHook;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
@@ -212,7 +213,8 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
ClientBuilder clientBuilder =
PerfClientUtils.createClientBuilderFromArguments(this)
.enableTransaction(!this.isDisableTransaction);
- try (PulsarClient client = clientBuilder.build()) {
+ PulsarClient client = clientBuilder.build();
+ try {
ExecutorService executorService = new
ThreadPoolExecutor(this.numTestThreads,
this.numTestThreads,
@@ -222,14 +224,14 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (this.testTime * 1e9);
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ Thread shutdownHookThread = addShutdownHook(() -> {
if (!this.isDisableTransaction) {
printTxnAggregatedThroughput(startTime);
} else {
printAggregatedThroughput(startTime);
}
printAggregatedStats();
- }));
+ });
// start perf test
AtomicBoolean executing = new AtomicBoolean(true);
@@ -257,13 +259,17 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
atomicReference = new AtomicReference<>(null);
}
} catch (Exception e) {
- log.error("Failed to build Producer/Consumer with
exception : ", e);
+ if (PerfClientUtils.hasInterruptedException(e)) {
+ Thread.currentThread().interrupt();
+ } else {
+ log.error("Failed to build Producer/Consumer with
exception : ", e);
+ }
executorService.shutdownNow();
PerfClientUtils.exit(1);
}
//The while loop has no break, and finally ends the
execution through the shutdownNow of
//the executorService
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
if (this.numTransactions > 0) {
if (totalNumTxnOpenTxnFail.sum()
+ totalNumTxnOpenTxnSuccess.sum() >=
this.numTransactions) {
@@ -309,7 +315,8 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
messageAckCumulativeRecorder.recordValue(latencyMicros);
numMessagesAckSuccess.increment();
}).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+
Thread.currentThread().interrupt();
return null;
}
log.error(
@@ -326,7 +333,8 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
messageAckCumulativeRecorder.recordValue(latencyMicros);
numMessagesAckSuccess.increment();
}).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+
Thread.currentThread().interrupt();
return null;
}
log.error(
@@ -352,7 +360,13 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
messageSendRCumulativeRecorder.recordValue(latencyMicros);
numMessagesSendSuccess.increment();
}).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+
Thread.currentThread().interrupt();
+ return null;
+ }
+ // Ignore the exception when
the producer is closed
+ if (exception.getCause()
+ instanceof
PulsarClientException.AlreadyClosedException) {
return null;
}
log.error("Send transaction
message failed with exception : ",
@@ -369,7 +383,13 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
messageSendRCumulativeRecorder.recordValue(latencyMicros);
numMessagesSendSuccess.increment();
}).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+
Thread.currentThread().interrupt();
+ return null;
+ }
+ // Ignore the exception when
the producer is closed
+ if (exception.getCause()
+ instanceof
PulsarClientException.AlreadyClosedException) {
return null;
}
log.error("Send message failed
with exception : ", exception);
@@ -390,7 +410,8 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
numTxnOpSuccess.increment();
totalNumEndTxnOpSuccess.increment();
}).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+
Thread.currentThread().interrupt();
return null;
}
log.error("Commit transaction {}
failed with exception",
@@ -404,7 +425,8 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
numTxnOpSuccess.increment();
totalNumEndTxnOpSuccess.increment();
}).exceptionally(exception -> {
- if (exception instanceof
InterruptedException && !executing.get()) {
+ if
(PerfClientUtils.hasInterruptedException(exception)) {
+ Thread.currentThread().interrupt();
return null;
}
log.error("Commit transaction {} failed
with exception",
@@ -414,7 +436,7 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
return null;
});
}
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
Transaction newTransaction =
client.newTransaction()
.withTransactionTimeout(this.transactionTimeout, TimeUnit.SECONDS)
@@ -424,11 +446,12 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
totalNumTxnOpenTxnSuccess.increment();
break;
} catch (Exception throwable) {
- if (throwable instanceof
InterruptedException && !executing.get()) {
- break;
+ if
(PerfClientUtils.hasInterruptedException(throwable)) {
+ Thread.currentThread().interrupt();
+ } else {
+ log.error("Failed to new transaction
with exception: ", throwable);
+ totalNumTxnOpenTxnFail.increment();
}
- log.error("Failed to new transaction with
exception: ", throwable);
- totalNumTxnOpenTxnFail.increment();
}
}
} else {
@@ -457,10 +480,11 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
histogramLogWriter.outputLogFormatVersion();
histogramLogWriter.outputLegend();
- while (executing.get()) {
+ while (!Thread.currentThread().isInterrupted() && executing.get())
{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
long now = System.nanoTime();
@@ -502,6 +526,10 @@ public class PerformanceTransaction extends
PerformanceBaseArguments{
oldTime = now;
}
+
+ PerfClientUtils.removeAndRunShutdownHook(shutdownHookThread);
+ } finally {
+ PerfClientUtils.closeClient(client);
}
}
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index 519bed6cdb5..c7f2077bc5d 100644
---
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -217,20 +217,20 @@ public class PerformanceProducerTest extends
MockedPulsarServiceBaseTest {
String topic = testTopic + UUID.randomUUID().toString();
String args = String.format(argString, topic,
pulsar.getBrokerServiceUrl(), pulsar.getWebServiceAddress());
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub")
- .subscriptionType(SubscriptionType.Key_Shared).subscribe();
- new Thread(() -> {
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+ Thread thread = new Thread(() -> {
try {
PerformanceProducer producer = new PerformanceProducer();
producer.run(args.split(" "));
} catch (Exception e) {
log.error("Failed to start perf producer");
}
- }).start();
- Awaitility.await()
- .untilAsserted(() -> {
- Message<byte[]> message = consumer.receive(3,
TimeUnit.SECONDS);
- assertNotNull(message);
- });
+ });
+ thread.start();
+ Message<byte[]> message = consumer.receive(15, TimeUnit.SECONDS);
+ assertNotNull(message);
+ thread.interrupt();
+ thread.join();
consumer.close();
}