This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a44c375ebb5 CAMEL-20256: adjust ThroughputLogger to use the StopWatch (#12508) a44c375ebb5 is described below commit a44c375ebb5c2c4b6a3651a815b8ebbd7c30a386 Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com> AuthorDate: Wed Dec 20 13:34:15 2023 -0300 CAMEL-20256: adjust ThroughputLogger to use the StopWatch (#12508) --- .../camel/support/processor/ThroughputLogger.java | 52 +++++++--------------- 1 file changed, 15 insertions(+), 37 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java index a72555c8bcd..9ebc7341b27 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java @@ -30,6 +30,7 @@ import org.apache.camel.spi.IdAware; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +50,8 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc private Integer groupSize; private long groupDelay = 1000; private Long groupInterval; - private long startTime; - private long groupStartTime; + private final StopWatch groupWatch = new StopWatch(); + private final StopWatch testWatch = new StopWatch(); private String action = "Received"; private CamelContext camelContext; private ScheduledExecutorService logSchedulerService; @@ -101,9 +102,6 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc @Override public void process(Exchange exchange) throws Exception { - if (startTime == 0) { - startTime = System.currentTimeMillis(); - } long receivedCount = receivedCounter.incrementAndGet(); //only process if groupSize is set...otherwise we're in groupInterval mode @@ -181,9 +179,7 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc } public void reset() { - startTime = 0; receivedCounter.set(0); - groupStartTime = 0; groupReceivedCount = 0; average = 0.0d; rate = 0.0d; @@ -230,18 +226,13 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc } protected String createLogMessage(Exchange exchange, long receivedCount) { - long time = System.currentTimeMillis(); - if (groupStartTime == 0) { - groupStartTime = startTime; - } - - rate = messagesPerSecond(groupSize, groupStartTime, time); - average = messagesPerSecond(receivedCount, startTime, time); + final long groupDuration = groupWatch.takenAndRestart(); + final long testDuration = testWatch.taken(); - long duration = time - groupStartTime; - groupStartTime = time; + rate = messagesPerSecond(groupSize, groupDuration); + average = messagesPerSecond(receivedCount, testDuration); - return getAction() + ": " + receivedCount + " messages so far. Last group took: " + duration + return getAction() + ": " + receivedCount + " messages so far. Last group took: " + groupDuration + " millis which is: " + numberFormat.format(rate) + " messages per second. average: " + numberFormat.format(average); } @@ -265,45 +256,32 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc } protected void createGroupIntervalLogMessage() { - - // this indicates that no messages have been received yet...don't logger yet - if (startTime == 0) { - return; - } - long receivedCount = receivedCounter.get(); - // if configured, hide logger messages when no new messages have been received if (groupActiveOnly && receivedCount == groupReceivedCount) { return; } - long time = System.currentTimeMillis(); - if (groupStartTime == 0) { - groupStartTime = startTime; - } + final long groupDuration = groupWatch.takenAndRestart(); + final long testDuration = testWatch.taken(); - long duration = time - groupStartTime; long currentCount = receivedCount - groupReceivedCount; - rate = messagesPerSecond(currentCount, groupStartTime, time); - average = messagesPerSecond(receivedCount, startTime, time); + rate = messagesPerSecond(currentCount, groupDuration); + average = messagesPerSecond(receivedCount, testDuration); - groupStartTime = time; groupReceivedCount = receivedCount; lastLogMessage = getAction() + ": " + currentCount + " new messages, with total " + receivedCount - + " so far. Last group took: " + duration + + " so far. Last group took: " + groupDuration + " millis which is: " + numberFormat.format(rate) + " messages per second. average: " + numberFormat.format(average); logger.log(lastLogMessage); } - protected double messagesPerSecond(long messageCount, long startTime, long endTime) { + protected double messagesPerSecond(long messageCount, long duration) { // timeOneMessage = elapsed / messageCount // messagePerSend = 1000 / timeOneMessage - double rate = messageCount * 1000.0; - rate /= endTime - startTime; - return rate; + return (messageCount * 1000.0) / duration; } }