This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 7978344cc84 (chores) camel-disruptor: reduce method size 7978344cc84 is described below commit 7978344cc8417cf6e5116b3a4527338212dfcd80 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Wed Oct 18 16:45:22 2023 +0200 (chores) camel-disruptor: reduce method size This should help profilers to provide better reports and help JVM to inline methods when possible --- .../component/disruptor/DisruptorConsumer.java | 41 ++++++----- .../component/disruptor/DisruptorProducer.java | 79 +++++++++++----------- 2 files changed, 65 insertions(+), 55 deletions(-) diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java index 1a8b1128b86..601789d2593 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java @@ -162,31 +162,38 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe // (see org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done). // To solve this problem, a new synchronization is set on the exchange that is to be // processed - result.getExchangeExtension().addOnCompletion(new Synchronization() { - @Override - public void onComplete(Exchange exchange) { - synchronizedExchange.consumed(result); - } - - @Override - public void onFailure(Exchange exchange) { - synchronizedExchange.consumed(result); - } - }); + result.getExchangeExtension().addOnCompletion(newSynchronization(synchronizedExchange, result)); // As the necessary post-processing of the exchange is done by the registered Synchronization, // we can suffice with a no-op AsyncCallback processor.process(result, NOOP_ASYNC_CALLBACK); } catch (Exception e) { - Exchange exchange = synchronizedExchange.getExchange(); + handleException(synchronizedExchange, e); + } + } + + private static Synchronization newSynchronization(SynchronizedExchange synchronizedExchange, Exchange result) { + return new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + synchronizedExchange.consumed(result); + } - if (exchange != null) { - getExceptionHandler().handleException("Error processing exchange", - exchange, e); - } else { - getExceptionHandler().handleException(e); + @Override + public void onFailure(Exchange exchange) { + synchronizedExchange.consumed(result); } + }; + } + + private void handleException(SynchronizedExchange synchronizedExchange, Exception e) { + Exchange exchange = synchronizedExchange.getExchange(); + + if (exchange != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } else { + getExceptionHandler().handleException(e); } } diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java index 49a41a59132..220e9b86ab6 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java @@ -43,9 +43,10 @@ public class DisruptorProducer extends DefaultAsyncProducer { private final DisruptorEndpoint endpoint; private boolean blockWhenFull; - public DisruptorProducer(final DisruptorEndpoint endpoint, - final WaitForTaskToComplete waitForTaskToComplete, - final long timeout, boolean blockWhenFull) { + public DisruptorProducer( + final DisruptorEndpoint endpoint, + final WaitForTaskToComplete waitForTaskToComplete, + final long timeout, boolean blockWhenFull) { super(endpoint); this.waitForTaskToComplete = waitForTaskToComplete; this.timeout = timeout; @@ -85,41 +86,7 @@ public class DisruptorProducer extends DefaultAsyncProducer { final CountDownLatch latch = new CountDownLatch(1); // we should wait for the reply so install a on completion so we know when its complete - copy.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { - @Override - public void onDone(final Exchange response) { - // check for timeout, which then already would have invoked the latch - if (latch.getCount() == 0) { - if (LOG.isTraceEnabled()) { - LOG.trace("{}. Timeout occurred so response will be ignored: {}", this, - response.getMessage()); - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("{} with response: {}", this, - response.getMessage()); - } - try { - ExchangeHelper.copyResults(exchange, response); - } finally { - // always ensure latch is triggered - latch.countDown(); - } - } - } - - @Override - public boolean allowHandover() { - // do not allow handover as we want to seda producer to have its completion triggered - // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored) - return false; - } - - @Override - public String toString() { - return "onDone at endpoint: " + endpoint; - } - }); + copy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, latch)); doPublish(copy); @@ -176,6 +143,42 @@ public class DisruptorProducer extends DefaultAsyncProducer { return true; } + private SynchronizationAdapter newOnCompletion(Exchange exchange, CountDownLatch latch) { + return new SynchronizationAdapter() { + @Override + public void onDone(final Exchange response) { + // check for timeout, which then already would have invoked the latch + if (latch.getCount() == 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("{}. Timeout occurred so response will be ignored: {}", this, response.getMessage()); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("{} with response: {}", this, response.getMessage()); + } + try { + ExchangeHelper.copyResults(exchange, response); + } finally { + // always ensure latch is triggered + latch.countDown(); + } + } + } + + @Override + public boolean allowHandover() { + // do not allow handover as we want to seda producer to have its completion triggered + // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored) + return false; + } + + @Override + public String toString() { + return "onDone at endpoint: " + endpoint; + } + }; + } + private void doPublish(Exchange exchange) { LOG.trace("Publishing Exchange to disruptor ringbuffer: {}", exchange);