This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 9075630 CAMEL-14354: camel-core optimize
9075630 is described below
commit 9075630f0942e8bf63627bf32020cc230ccbe329
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jan 27 19:40:19 2020 +0100
CAMEL-14354: camel-core optimize
---
.../src/main/java/org/apache/camel/Exchange.java | 1 +
.../java/org/apache/camel/ExtendedExchange.java | 10 ++++++
.../org/apache/camel/processor/CatchProcessor.java | 5 +--
.../apache/camel/processor/MulticastProcessor.java | 6 ++--
.../camel/processor/OnCompletionProcessor.java | 41 +++++++++++-----------
.../org/apache/camel/processor/PollEnricher.java | 2 +-
.../processor/aggregate/AggregateProcessor.java | 2 +-
.../errorhandler/RedeliveryErrorHandler.java | 14 ++++----
.../camel/builder/NoErrorHandlerBuilder.java | 3 +-
.../errorhandler/NoErrorHandlerReifier.java | 3 +-
.../BridgeExceptionHandlerToErrorHandler.java | 3 +-
.../org/apache/camel/support/DefaultExchange.java | 22 +++++++++---
.../org/apache/camel/support/ExchangeHelper.java | 10 ------
13 files changed, 72 insertions(+), 50 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
index 364722f..5d6991d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Exchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Exchange.java
@@ -205,6 +205,7 @@ public interface Exchange {
String REDELIVERED = "CamelRedelivered";
String REDELIVERY_COUNTER = "CamelRedeliveryCounter";
String REDELIVERY_MAX_COUNTER = "CamelRedeliveryMaxCounter";
+ @Deprecated
String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted";
String REDELIVERY_DELAY = "CamelRedeliveryDelay";
String REST_HTTP_URI = "CamelRestHttpUri";
diff --git
a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index e3681e9..582102c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -118,4 +118,14 @@ public interface ExtendedExchange extends Exchange {
*/
void setInterrupted(boolean interrupted);
+ /**
+ * Whether the exchange has exhausted (attempted all) its redeliveries and
still failed.
+ */
+ boolean isRedeliveryExhausted();
+
+ /**
+ * Used to signal that this exchange has exhausted (attempted all) its
redeliveries and still failed.
+ */
+ void setRedeliveryExhausted(boolean redeliveryExhausted);
+
}
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java
index b766fb9..b9b3329 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -20,6 +20,7 @@ import java.util.List;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
@@ -102,7 +103,7 @@ public class CatchProcessor extends DelegateAsyncProcessor
implements Traceable,
exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
exchange.setException(null);
// and we should not be regarded as exhausted as we are in a try ..
catch block
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+ exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
if (LOG.isDebugEnabled()) {
LOG.debug("The exception is handled for the exception: {} caused
by: {}",
@@ -118,7 +119,7 @@ public class CatchProcessor extends DelegateAsyncProcessor
implements Traceable,
EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange,
processor, false, null);
// always clear redelivery exhausted in a catch clause
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
if (!doneSync) {
// signal callback to continue routing async
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index e9bb034..95af8ad 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -44,6 +44,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -528,7 +529,8 @@ public class MulticastProcessor extends
AsyncProcessorSupport implements Navigat
// also we would need to know if any error handler has attempted
redelivery and exhausted
boolean stoppedOnException = false;
boolean exception = false;
- boolean exhaust = forceExhaust || subExchange != null &&
(subExchange.getException() != null ||
ExchangeHelper.isRedeliveryExhausted(subExchange));
+ ExtendedExchange see = (ExtendedExchange) subExchange;
+ boolean exhaust = forceExhaust || see != null && (see.getException()
!= null || see.isRedeliveryExhausted());
if (original.getException() != null || subExchange != null &&
subExchange.getException() != null) {
// there was an exception and we stopped
stoppedOnException = isStopOnException();
@@ -553,7 +555,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport implements Navigat
// multicast uses error handling on its output processors and they
have tried to redeliver
// so we shall signal back to the other error handlers that we are
exhausted and they should not
// also try to redeliver as we would then do that twice
- original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
+
original.adapt(ExtendedExchange.class).setRedeliveryExhausted(exhaust);
}
reactiveExecutor.schedule(callback);
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 0484619..0b812f3 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Ordered;
import org.apache.camel.Predicate;
@@ -144,22 +145,24 @@ public class OnCompletionProcessor extends
AsyncProcessorSupport implements Trac
* @param exchange the exchange
*/
protected static void doProcess(Processor processor, Exchange exchange) {
+ ExtendedExchange ee = (ExtendedExchange) exchange;
// must remember some properties which we cannot use during
onCompletion processing
// as otherwise we may cause issues
// but keep the caused exception stored as a property
(Exchange.EXCEPTION_CAUGHT) on the exchange
- boolean stop = exchange.isRouteStop();
- exchange.setRouteStop(false);
- Object failureHandled =
exchange.removeProperty(Exchange.FAILURE_HANDLED);
- Object errorhandlerHandled =
exchange.removeProperty(Exchange.ERRORHANDLER_HANDLED);
- boolean rollbackOnly = exchange.isRollbackOnly();
- exchange.setRollbackOnly(false);
- boolean rollbackOnlyLast = exchange.isRollbackOnlyLast();
- exchange.setRollbackOnlyLast(false);
+ boolean stop = ee.isRouteStop();
+ ee.setRouteStop(false);
+ Object failureHandled = ee.removeProperty(Exchange.FAILURE_HANDLED);
+ Object errorhandlerHandled =
ee.removeProperty(Exchange.ERRORHANDLER_HANDLED);
+ boolean rollbackOnly = ee.isRollbackOnly();
+ ee.setRollbackOnly(false);
+ boolean rollbackOnlyLast = ee.isRollbackOnlyLast();
+ ee.setRollbackOnlyLast(false);
// and we should not be regarded as exhausted as we are in a
onCompletion block
- Object exhausted =
exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+ boolean exhausted =
ee.adapt(ExtendedExchange.class).isRedeliveryExhausted();
+ ee.setRedeliveryExhausted(false);
- Exception cause = exchange.getException();
- exchange.setException(null);
+ Exception cause = ee.getException();
+ ee.setException(null);
try {
processor.process(exchange);
@@ -167,20 +170,18 @@ public class OnCompletionProcessor extends
AsyncProcessorSupport implements Trac
exchange.setException(e);
} finally {
// restore the options
- exchange.setRouteStop(stop);
+ ee.setRouteStop(stop);
if (failureHandled != null) {
- exchange.setProperty(Exchange.FAILURE_HANDLED, failureHandled);
+ ee.setProperty(Exchange.FAILURE_HANDLED, failureHandled);
}
if (errorhandlerHandled != null) {
- exchange.setProperty(Exchange.ERRORHANDLER_HANDLED,
errorhandlerHandled);
- }
- exchange.setRollbackOnly(rollbackOnly);
- exchange.setRollbackOnlyLast(rollbackOnlyLast);
- if (exhausted != null) {
- exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhausted);
+ ee.setProperty(Exchange.ERRORHANDLER_HANDLED,
errorhandlerHandled);
}
+ ee.setRollbackOnly(rollbackOnly);
+ ee.setRollbackOnlyLast(rollbackOnlyLast);
+ ee.setRedeliveryExhausted(exhausted);
if (cause != null) {
- exchange.setException(cause);
+ ee.setException(cause);
}
}
}
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
index 1eeb601..44e8757 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -306,7 +306,7 @@ public class PollEnricher extends AsyncProcessorSupport
implements IdAware, Rout
// restore caused exception
exchange.setException(cause);
// remove the exhausted marker as we want to be able to
perform redeliveries with the error handler
- exchange.removeProperties(Exchange.REDELIVERY_EXHAUSTED);
+
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
// preserve the redelivery stats
if (redeliveried != null) {
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 503e2c7..3146223 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1381,7 +1381,7 @@ public class AggregateProcessor extends
AsyncProcessorSupport implements Navigat
try {
// set redelivery counter
exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
-
exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
} catch (Throwable e) {
exchange.setException(e);
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 8336458..073d292 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -268,7 +268,8 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport impleme
* Strategy to determine if the exchange is done so we can continue
*/
protected boolean isDone(Exchange exchange) {
- if (((ExtendedExchange) exchange).isInterrupted()) {
+ ExtendedExchange ee = (ExtendedExchange) exchange;
+ if (ee.isInterrupted()) {
// mark the exchange to stop continue routing when interrupted
// as we do not want to continue routing (for example a task has
been cancelled)
if (LOG.isTraceEnabled()) {
@@ -283,7 +284,7 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport impleme
// or we are exhausted
boolean answer = exchange.getException() == null
|| ExchangeHelper.isFailureHandled(exchange)
- || ExchangeHelper.isRedeliveryExhausted(exchange);
+ || ee.isRedeliveryExhausted();
if (LOG.isTraceEnabled()) {
LOG.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(),
answer);
@@ -442,7 +443,7 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport impleme
// the task was rejected
exchange.setException(new
RejectedExecutionException("Redelivery not allowed while stopping"));
// mark the exchange as redelivery exhausted
so the failure processor / dead letter channel can process the exchange
-
exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
// jump to start of loop which then detects
that we are failed and exhausted
reactiveExecutor.schedule(this);
} else {
@@ -766,7 +767,7 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport impleme
exchange.getIn().removeHeader(Exchange.REDELIVERED);
exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
// and remove traces of rollback only and uow exhausted markers
exchange.setRollbackOnly(false);
@@ -1104,15 +1105,16 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport impleme
* @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to
exhaust.
*/
private boolean isExhausted(Exchange exchange) {
+ ExtendedExchange ee = (ExtendedExchange) exchange;
// if marked as rollback only then do not continue/redeliver
- boolean exhausted = ExchangeHelper.isRedeliveryExhausted(exchange);
+ boolean exhausted = ee.isRedeliveryExhausted();
if (exhausted) {
LOG.trace("This exchange is marked as redelivery exhausted:
{}", exchange);
return true;
}
// if marked as rollback only then do not continue/redeliver
- boolean rollbackOnly = exchange.isRollbackOnly();
+ boolean rollbackOnly = ee.isRollbackOnly();
if (rollbackOnly) {
LOG.trace("This exchange is marked as rollback only, so
forcing it to be exhausted: {}", exchange);
return true;
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java
b/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java
index 6ccd34a..3565d35 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java
@@ -18,6 +18,7 @@ package org.apache.camel.builder;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -40,7 +41,7 @@ public class NoErrorHandlerBuilder extends
ErrorHandlerBuilderSupport {
return super.process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
callback.done(doneSync);
}
});
diff --git
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/NoErrorHandlerReifier.java
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/NoErrorHandlerReifier.java
index 3beae5f..118297b 100644
---
a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/NoErrorHandlerReifier.java
+++
b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/NoErrorHandlerReifier.java
@@ -19,6 +19,7 @@ package org.apache.camel.reifier.errorhandler;
import org.apache.camel.AsyncCallback;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.NoErrorHandlerBuilder;
import org.apache.camel.spi.RouteContext;
@@ -38,7 +39,7 @@ public class NoErrorHandlerReifier extends
ErrorHandlerReifier<NoErrorHandlerBui
return super.process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
- exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false);
callback.done(doneSync);
}
});
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
index 5ca068a..4e027e6 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/BridgeExceptionHandlerToErrorHandler.java
@@ -17,6 +17,7 @@
package org.apache.camel.support;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.UnitOfWork;
@@ -69,7 +70,7 @@ public class BridgeExceptionHandlerToErrorHandler implements
ExceptionHandler {
// and the message
exchange.getIn().setBody(message);
// and mark as redelivery exhausted as we cannot do redeliveries
- exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
+ exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
// wrap in UoW
UnitOfWork uow = null;
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 3961ec7..e46b54c 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -63,6 +63,7 @@ public final class DefaultExchange implements
ExtendedExchange {
private boolean rollbackOnlyLast;
private boolean notifyEvent;
private boolean interrupted;
+ private boolean redeliveryExhausted;
public DefaultExchange(CamelContext context) {
this(context, ExchangePattern.InOnly);
@@ -124,11 +125,12 @@ public final class DefaultExchange implements
ExtendedExchange {
}
}
- exchange.setException(getException());
- exchange.setRouteStop(isRouteStop());
- exchange.setRollbackOnly(isRollbackOnly());
- exchange.setRollbackOnlyLast(isRollbackOnlyLast());
- exchange.setNotifyEvent(isNotifyEvent());
+ exchange.setException(exception);
+ exchange.setRouteStop(routeStop);
+ exchange.setRollbackOnly(rollbackOnly);
+ exchange.setRollbackOnlyLast(rollbackOnlyLast);
+ exchange.setNotifyEvent(notifyEvent);
+ exchange.setRedeliveryExhausted(redeliveryExhausted);
// copy properties after body as body may trigger lazy init
if (hasProperties()) {
@@ -639,6 +641,16 @@ public final class DefaultExchange implements
ExtendedExchange {
this.interrupted = interrupted;
}
+ @Override
+ public boolean isRedeliveryExhausted() {
+ return redeliveryExhausted;
+ }
+
+ @Override
+ public void setRedeliveryExhausted(boolean redeliveryExhausted) {
+ this.redeliveryExhausted = redeliveryExhausted;
+ }
+
/**
* Configures the message after it has been set on the exchange
*/
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 26e91be..6bf16ff 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -585,16 +585,6 @@ public final class ExchangeHelper {
}
/**
- * Checks whether the exchange is redelivery exhausted
- *
- * @param exchange the exchange
- * @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
- */
- public static boolean isRedeliveryExhausted(Exchange exchange) {
- return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false,
Boolean.class);
- }
-
- /**
* Checks whether the exchange {@link UnitOfWork} is redelivered
*
* @param exchange the exchange