This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5f43fc6 Add test time for perf consumer and perf reader (#7044) 5f43fc6 is described below commit 5f43fc6547b104dc367e8359cf5d4aad06670b0f Author: lipenghui <peng...@apache.org> AuthorDate: Wed May 27 11:32:48 2020 +0800 Add test time for perf consumer and perf reader (#7044) --- .../org/apache/pulsar/testclient/PerformanceConsumer.java | 14 +++++++++++++- .../org/apache/pulsar/testclient/PerformanceReader.java | 13 ++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 443f3bd..b667eff 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -136,6 +136,10 @@ public class PerformanceConsumer { @Parameter(names = { "-v", "--encryption-key-value-file" }, description = "The file which contains the private key to decrypt payload") public String encKeyFile = null; + + @Parameter(names = { "-time", + "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming") + public long testTime = 0; } public static void main(String[] args) throws Exception { @@ -200,8 +204,16 @@ public class PerformanceConsumer { final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0)); final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; - + long startTime = System.nanoTime(); + long testEndTime = startTime + (long) (arguments.testTime * 1e9); MessageListener<byte[]> listener = (consumer, msg) -> { + if (arguments.testTime > 0) { + if (System.nanoTime() > testEndTime) { + log.info("------------------- DONE -----------------------"); + printAggregatedStats(); + System.exit(0); + } + } messagesReceived.increment(); bytesReceived.add(msg.getData().length); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index 73db7d7..9e5480e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -106,6 +106,10 @@ public class PerformanceReader { @Parameter(names = { "--trust-cert-file" }, description = "Path for the trusted TLS certificate file") public String tlsTrustCertsFilePath = ""; + + @Parameter(names = { "-time", + "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming") + public long testTime = 0; } public static void main(String[] args) throws Exception { @@ -174,8 +178,15 @@ public class PerformanceReader { final TopicName prefixTopicName = TopicName.get(arguments.topic.get(0)); final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; - + long startTime = System.nanoTime(); + long testEndTime = startTime + (long) (arguments.testTime * 1e9); ReaderListener<byte[]> listener = (reader, msg) -> { + if (arguments.testTime > 0) { + if (System.nanoTime() > testEndTime) { + log.info("------------------- DONE -----------------------"); + System.exit(0); + } + } messagesReceived.increment(); bytesReceived.add(msg.getData().length);