This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new 0cfa5c424d7 CAMEL-21343: camel-seda - Allow to mimic the behavior of
camel-vm (#15930)
0cfa5c424d7 is described below
commit 0cfa5c424d749944adcc826ec837fb0ce83e891a
Author: Nicolas Filotto <[email protected]>
AuthorDate: Fri Oct 11 18:44:42 2024 +0200
CAMEL-21343: camel-seda - Allow to mimic the behavior of camel-vm (#15930)
---
.../apache/camel/component/seda/SedaConsumer.java | 48 +++++++++++++++-------
1 file changed, 33 insertions(+), 15 deletions(-)
diff --git
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 5fdb9188710..6efe2ef8af0 100644
---
a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++
b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeExtension;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.Suspendable;
@@ -181,19 +182,13 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
}
if (exchange != null) {
try {
- final Exchange target = exchange;
+ final Exchange original = exchange;
// prepare the exchange before sending to consumer
- prepareExchange(target);
+ final Exchange prepared = prepareExchange(exchange);
// callback to be executed when sending to consumer
and processing is done
- AsyncCallback callback = doneSync -> {
- // log exception if an exception occurred and was
not handled
- if (target.getException() != null) {
- getExceptionHandler().handleException("Error
processing exchange", target,
- target.getException());
- }
- };
+ AsyncCallback callback = doneSync ->
onProcessingDone(original, prepared);
// process the exchange
- sendToConsumers(target, callback);
+ sendToConsumers(prepared, callback);
} catch (Exception e) {
getExceptionHandler().handleException("Error
processing exchange", exchange, e);
}
@@ -215,15 +210,38 @@ public class SedaConsumer extends DefaultConsumer
implements Runnable, ShutdownA
}
}
+ /**
+ * Strategy to invoke when the exchange is done being processed.
+ * <p/>
+ * This method is meant to be overridden by subclasses to be able to mimic
the behavior of the legacy component
+ * camel-vm, that is why the parameter {@code prepared} is not used by
default.
+ *
+ * @param original the exchange before being processed
+ * @param prepared the exchange processed
+ */
+ protected void onProcessingDone(Exchange original, Exchange prepared) {
+ // log exception if an exception occurred and was not handled
+ if (original.getException() != null) {
+ getExceptionHandler().handleException("Error processing exchange",
original,
+ original.getException());
+ }
+ }
+
/**
* Strategy to prepare exchange for being processed by this consumer
+ * <p/>
+ * This method is meant to be overridden by subclasses to be able to mimic
the behavior of the legacy component
+ * camel-vm, that is why the prepared exchange is returned.
*
- * @param exchange the exchange
+ * @param exchange the exchange
+ * @return the exchange to process by this consumer
*/
- protected void prepareExchange(Exchange exchange) {
- // this consumer grabbed the exchange so mark its from this
route/endpoint
- exchange.getExchangeExtension().setFromEndpoint(getEndpoint());
- exchange.getExchangeExtension().setFromRouteId(getRouteId());
+ protected Exchange prepareExchange(Exchange exchange) {
+ // this consumer grabbed the exchange so mark it's from this
route/endpoint
+ ExchangeExtension exchangeExtension = exchange.getExchangeExtension();
+ exchangeExtension.setFromEndpoint(getEndpoint());
+ exchangeExtension.setFromRouteId(getRouteId());
+ return exchange;
}
/**