This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 7978344cc84 (chores) camel-disruptor: reduce method size
7978344cc84 is described below

commit 7978344cc8417cf6e5116b3a4527338212dfcd80
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Wed Oct 18 16:45:22 2023 +0200

    (chores) camel-disruptor: reduce method size
    
    This should help profilers to provide better reports and help JVM to inline 
methods when possible
---
 .../component/disruptor/DisruptorConsumer.java     | 41 ++++++-----
 .../component/disruptor/DisruptorProducer.java     | 79 +++++++++++-----------
 2 files changed, 65 insertions(+), 55 deletions(-)

diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index 1a8b1128b86..601789d2593 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -162,31 +162,38 @@ public class DisruptorConsumer extends ServiceSupport 
implements Consumer, Suspe
             // (see 
org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done).
             // To solve this problem, a new synchronization is set on the 
exchange that is to be
             // processed
-            result.getExchangeExtension().addOnCompletion(new 
Synchronization() {
-                @Override
-                public void onComplete(Exchange exchange) {
-                    synchronizedExchange.consumed(result);
-                }
-
-                @Override
-                public void onFailure(Exchange exchange) {
-                    synchronizedExchange.consumed(result);
-                }
-            });
+            
result.getExchangeExtension().addOnCompletion(newSynchronization(synchronizedExchange,
 result));
 
             // As the necessary post-processing of the exchange is done by the 
registered Synchronization,
             // we can suffice with a no-op AsyncCallback
             processor.process(result, NOOP_ASYNC_CALLBACK);
 
         } catch (Exception e) {
-            Exchange exchange = synchronizedExchange.getExchange();
+            handleException(synchronizedExchange, e);
+        }
+    }
+
+    private static Synchronization newSynchronization(SynchronizedExchange 
synchronizedExchange, Exchange result) {
+        return new Synchronization() {
+            @Override
+            public void onComplete(Exchange exchange) {
+                synchronizedExchange.consumed(result);
+            }
 
-            if (exchange != null) {
-                getExceptionHandler().handleException("Error processing 
exchange",
-                        exchange, e);
-            } else {
-                getExceptionHandler().handleException(e);
+            @Override
+            public void onFailure(Exchange exchange) {
+                synchronizedExchange.consumed(result);
             }
+        };
+    }
+
+    private void handleException(SynchronizedExchange synchronizedExchange, 
Exception e) {
+        Exchange exchange = synchronizedExchange.getExchange();
+
+        if (exchange != null) {
+            getExceptionHandler().handleException("Error processing exchange", 
exchange, e);
+        } else {
+            getExceptionHandler().handleException(e);
         }
     }
 
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
index 49a41a59132..220e9b86ab6 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
@@ -43,9 +43,10 @@ public class DisruptorProducer extends DefaultAsyncProducer {
     private final DisruptorEndpoint endpoint;
     private boolean blockWhenFull;
 
-    public DisruptorProducer(final DisruptorEndpoint endpoint,
-                             final WaitForTaskToComplete waitForTaskToComplete,
-                             final long timeout, boolean blockWhenFull) {
+    public DisruptorProducer(
+            final DisruptorEndpoint endpoint,
+            final WaitForTaskToComplete waitForTaskToComplete,
+            final long timeout, boolean blockWhenFull) {
         super(endpoint);
         this.waitForTaskToComplete = waitForTaskToComplete;
         this.timeout = timeout;
@@ -85,41 +86,7 @@ public class DisruptorProducer extends DefaultAsyncProducer {
             final CountDownLatch latch = new CountDownLatch(1);
 
             // we should wait for the reply so install a on completion so we 
know when its complete
-            copy.getExchangeExtension().addOnCompletion(new 
SynchronizationAdapter() {
-                @Override
-                public void onDone(final Exchange response) {
-                    // check for timeout, which then already would have 
invoked the latch
-                    if (latch.getCount() == 0) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("{}. Timeout occurred so response will 
be ignored: {}", this,
-                                    response.getMessage());
-                        }
-                    } else {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("{} with response: {}", this,
-                                    response.getMessage());
-                        }
-                        try {
-                            ExchangeHelper.copyResults(exchange, response);
-                        } finally {
-                            // always ensure latch is triggered
-                            latch.countDown();
-                        }
-                    }
-                }
-
-                @Override
-                public boolean allowHandover() {
-                    // do not allow handover as we want to seda producer to 
have its completion triggered
-                    // at this point in the routing (at this leg), instead of 
at the very last (this ensure timeout is honored)
-                    return false;
-                }
-
-                @Override
-                public String toString() {
-                    return "onDone at endpoint: " + endpoint;
-                }
-            });
+            
copy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, latch));
 
             doPublish(copy);
 
@@ -176,6 +143,42 @@ public class DisruptorProducer extends 
DefaultAsyncProducer {
         return true;
     }
 
+    private SynchronizationAdapter newOnCompletion(Exchange exchange, 
CountDownLatch latch) {
+        return new SynchronizationAdapter() {
+            @Override
+            public void onDone(final Exchange response) {
+                // check for timeout, which then already would have invoked 
the latch
+                if (latch.getCount() == 0) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("{}. Timeout occurred so response will be 
ignored: {}", this, response.getMessage());
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("{} with response: {}", this, 
response.getMessage());
+                    }
+                    try {
+                        ExchangeHelper.copyResults(exchange, response);
+                    } finally {
+                        // always ensure latch is triggered
+                        latch.countDown();
+                    }
+                }
+            }
+
+            @Override
+            public boolean allowHandover() {
+                // do not allow handover as we want to seda producer to have 
its completion triggered
+                // at this point in the routing (at this leg), instead of at 
the very last (this ensure timeout is honored)
+                return false;
+            }
+
+            @Override
+            public String toString() {
+                return "onDone at endpoint: " + endpoint;
+            }
+        };
+    }
+
     private void doPublish(Exchange exchange) {
         LOG.trace("Publishing Exchange to disruptor ringbuffer: {}", exchange);
 

Reply via email to