This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new b33bea6 CAMEL-17009: MDCUnitOfWork should clear when done/stopping and also after process to not leak state to other threads. CAMEL-17153: UnitOfWork afterProcess should be invoked via reactive executor to ensure ordering. b33bea6 is described below commit b33bea6cccbc797cf874884247fac69bf8ef33ae Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Nov 1 09:35:13 2021 +0100 CAMEL-17009: MDCUnitOfWork should clear when done/stopping and also after process to not leak state to other threads. CAMEL-17153: UnitOfWork afterProcess should be invoked via reactive executor to ensure ordering. --- .../camel/impl/engine/CamelInternalProcessor.java | 10 ++++++---- .../org/apache/camel/impl/engine/MDCUnitOfWork.java | 17 ++++++++++++++++- .../org/apache/camel/processor/MDCClearingTest.java | 3 --- ...t.java => MDCResetMidRouteProducerTemplateTest.java} | 4 +++- .../java/org/apache/camel/processor/MDCResetTest.java | 11 +---------- .../java/org/apache/camel/support/UnitOfWorkHelper.java | 6 ++++++ 6 files changed, 32 insertions(+), 19 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 25931fc..97ba072 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -382,8 +382,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } else { final UnitOfWork uow = exchange.getUnitOfWork(); - // do uow before processing and if a value is returned the the uow wants to be processed after - // was well in the same thread + // optimize to only do before uow processing if really needed AsyncCallback async = afterTask; boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess(); if (beforeAndAfter) { @@ -403,8 +402,11 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In // optimize to only do after uow processing if really needed if (beforeAndAfter) { - // execute any after processor work (in current thread, not in the callback) - uow.afterProcess(processor, exchange, afterTask, sync); + // use the same callback as with beforeProcess + final CamelInternalTask afterCallback = afterTask; + reactiveExecutor.schedule(() -> { + uow.afterProcess(processor, exchange, afterCallback, sync); + }); } if (LOG.isTraceEnabled()) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java index db7d5d1..dd76a31 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java @@ -24,6 +24,7 @@ import org.apache.camel.Exchange; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Processor; import org.apache.camel.Route; +import org.apache.camel.Service; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.PatternHelper; @@ -34,7 +35,7 @@ import org.slf4j.MDC; /** * This unit of work supports <a href="http://www.slf4j.org/api/org/slf4j/MDC.html">MDC</a>. */ -public class MDCUnitOfWork extends DefaultUnitOfWork { +public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { private static final Logger LOG = LoggerFactory.getLogger(MDCUnitOfWork.class); @@ -147,6 +148,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork { if (stepId == null) { MDC.remove(MDC_STEP_ID); } + + // clear to avoid leaking to current thread when + // the exchange is continued routed asynchronously + clear(); } /** @@ -208,6 +213,16 @@ public class MDCUnitOfWork extends DefaultUnitOfWork { } @Override + public void start() { + // noop + } + + @Override + public void stop() { + clear(); + } + + @Override public String toString() { return "MDCUnitOfWork"; } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java index acf37f0..9869156 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCClearingTest.java @@ -26,7 +26,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.support.AsyncProcessorSupport; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +56,6 @@ public class MDCClearingTest extends ContextTestSupport { } @Test - @Disabled public void shouldPropagateAndClearMdcInAsyncRoute() { // given MDC.remove(CAMEL_BREADCRUMB_ID); @@ -70,7 +68,6 @@ public class MDCClearingTest extends ContextTestSupport { } @Test - @Disabled public void shouldPropagateAndClearMdcInMixedRoute() { // given MDC.remove(CAMEL_BREADCRUMB_ID); diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java similarity index 96% copy from core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java copy to core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java index 0f8939d..8038b71 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetMidRouteProducerTemplateTest.java @@ -21,6 +21,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.MDC; @@ -30,7 +31,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; * Tests that MDC works as a stack remembering old values when using a producer template to send in new messages during * routing. */ -public class MDCResetTest extends ContextTestSupport { +@Disabled +public class MDCResetMidRouteProducerTemplateTest extends ContextTestSupport { @Test public void testMDC() throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java index 0f8939d..ee2cc2a 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCResetTest.java @@ -27,8 +27,7 @@ import org.slf4j.MDC; import static org.junit.jupiter.api.Assertions.assertEquals; /** - * Tests that MDC works as a stack remembering old values when using a producer template to send in new messages during - * routing. + * Tests that MDC works as a stack remembering old values when routing between routes. */ public class MDCResetTest extends ContextTestSupport { @@ -68,14 +67,6 @@ public class MDCResetTest extends ContextTestSupport { } }).to("log:foo").to("direct:b").process(new Processor() { public void process(Exchange exchange) throws Exception { - String body = exchange.getIn().getBody(String.class); - // use a producer template to send to b, instead of in - // the route DSL - body = template.requestBody("direct:b", body, String.class); - exchange.getMessage().setBody(body); - } - }).process(new Processor() { - public void process(Exchange exchange) throws Exception { assertEquals("route-a", MDC.get("camel.routeId")); assertEquals(exchange.getExchangeId(), MDC.get("camel.exchangeId")); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java index 47309a4..8dafdcc 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java @@ -22,10 +22,12 @@ import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.Route; +import org.apache.camel.Service; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationRouteAware; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.spi.annotations.EagerClassloaded; +import org.apache.camel.support.service.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,10 +59,14 @@ public final class UnitOfWorkHelper { // unit of work is done try { uow.done(exchange); + if (uow instanceof Service) { + ServiceHelper.stopService(uow); + } } catch (Throwable e) { LOG.warn("Exception occurred during done UnitOfWork for Exchange: {}. This exception will be ignored.", exchange, e); } + } public static void doneSynchronizations(Exchange exchange, List<Synchronization> synchronizations, Logger log) {