This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0037da02e9322681dbd0387a346107552d1aa411 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Jan 22 18:45:40 2022 +0100 CAMEL-17514: Fix MDC logging to ensure data is setup before processing. Error handler calling processors must do so in UoW. --- .../apache/camel/impl/engine/MDCUnitOfWork.java | 10 +++- .../errorhandler/DeadLetterChannelReifier.java | 6 +- .../errorhandler/DefaultErrorHandlerReifier.java | 6 +- .../reifier/errorhandler/ErrorHandlerReifier.java | 17 +++++- .../camel/processor/MDCErrorHandlerTest.java | 70 ++++++++++++++++++++++ 5 files changed, 99 insertions(+), 10 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java index 797e181..074521d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java @@ -65,6 +65,10 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { this.originalCamelContextId = MDC.get(MDC_CAMEL_CONTEXT_ID); this.originalTransactionKey = MDC.get(MDC_TRANSACTION_KEY); + prepareMDC(exchange); + } + + protected void prepareMDC(Exchange exchange) { // must add exchange and message id in constructor MDC.put(MDC_EXCHANGE_ID, exchange.getExchangeId()); String msgId = exchange.getMessage().getMessageId(); @@ -133,11 +137,14 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { @Override public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) { + // prepare MDC before processing + prepareMDC(exchange); // add optional step id String stepId = exchange.getProperty(ExchangePropertyKey.STEP_ID, String.class); if (stepId != null) { MDC.put(MDC_STEP_ID, stepId); } + // return callback with after processing work return new MDCCallback(callback, pattern); } @@ -148,8 +155,7 @@ public class MDCUnitOfWork extends DefaultUnitOfWork implements Service { if (stepId == null) { MDC.remove(MDC_STEP_ID); } - - // clear to avoid leaking to current thread when + // clear MDC to avoid leaking to current thread when // the exchange is continued routed asynchronously clear(); } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/DeadLetterChannelReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/DeadLetterChannelReifier.java index 8cb6a59..7fb4f12 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/DeadLetterChannelReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/DeadLetterChannelReifier.java @@ -47,14 +47,14 @@ public class DeadLetterChannelReifier extends DefaultErrorHandlerReifier<DeadLet DeadLetterChannel answer = new DeadLetterChannel( camelContext, processor, logger, - getBean(Processor.class, definition.getOnRedelivery(), definition.getOnRedeliveryRef()), + getProcessor(definition.getOnRedelivery(), definition.getOnRedeliveryRef()), redeliveryPolicy, deadLetterProcessor, definition.getDeadLetterUri(), definition.isDeadLetterHandleNewException(), definition.isUseOriginalMessage(), definition.isUseOriginalBody(), definition.getRetryWhilePolicy(camelContext), getExecutorService(definition.getExecutorService(), definition.getExecutorServiceRef()), - getBean(Processor.class, definition.getOnPrepareFailure(), definition.getOnPrepareFailureRef()), - getBean(Processor.class, definition.getOnExceptionOccurred(), definition.getOnExceptionOccurredRef())); + getProcessor(definition.getOnPrepareFailure(), definition.getOnPrepareFailureRef()), + getProcessor(definition.getOnExceptionOccurred(), definition.getOnExceptionOccurredRef())); // configure error handler before we can use it configure(answer); return answer; diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/DefaultErrorHandlerReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/DefaultErrorHandlerReifier.java index 55f37b9..653487e0 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/DefaultErrorHandlerReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/DefaultErrorHandlerReifier.java @@ -43,12 +43,12 @@ public class DefaultErrorHandlerReifier<T extends DefaultErrorHandlerProperties> DefaultErrorHandler answer = new DefaultErrorHandler( camelContext, processor, logger, - getBean(Processor.class, definition.getOnRedelivery(), definition.getOnRedeliveryRef()), + getProcessor(definition.getOnRedelivery(), definition.getOnRedeliveryRef()), redeliveryPolicy, getPredicate(definition.getRetryWhile(), definition.getRetryWhileRef()), getExecutorService(definition.getExecutorService(), definition.getExecutorServiceRef()), - getBean(Processor.class, definition.getOnPrepareFailure(), definition.getOnPrepareFailureRef()), - getBean(Processor.class, definition.getOnExceptionOccurred(), definition.getOnExceptionOccurredRef())); + getProcessor(definition.getOnPrepareFailure(), definition.getOnPrepareFailureRef()), + getProcessor(definition.getOnExceptionOccurred(), definition.getOnExceptionOccurredRef())); // configure error handler before we can use it configure(answer); return answer; diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java index 1bfeb16..7ac9428 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java @@ -25,6 +25,7 @@ import java.util.function.BiFunction; import org.apache.camel.CamelContext; import org.apache.camel.ErrorHandlerFactory; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.LoggingLevel; import org.apache.camel.NamedNode; import org.apache.camel.Predicate; @@ -115,8 +116,8 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerFactory> extends if (retryWhile == null && def.getRetryWhile() != null) { retryWhile = createPredicate(def.getRetryWhile()); } - Processor onRedelivery = getBean(Processor.class, def.getOnRedelivery(), def.getOnRedeliveryRef()); - Processor onExceptionOccurred = getBean(Processor.class, def.getOnExceptionOccurred(), def.getOnExceptionOccurredRef()); + Processor onRedelivery = getProcessor(def.getOnRedelivery(), def.getOnRedeliveryRef()); + Processor onExceptionOccurred = getProcessor(def.getOnExceptionOccurred(), def.getOnExceptionOccurredRef()); return new ExceptionPolicy( def.getId(), CamelContextHelper.getRouteId(def), parseBoolean(def.getUseOriginalMessage(), false), @@ -353,4 +354,16 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerFactory> extends return bean; } + protected Processor getProcessor(Processor processor, String ref) { + if (processor == null) { + processor = getBean(Processor.class, null, ref); + } + if (processor != null) { + // must wrap the processor in an UoW + processor = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory() + .addUnitOfWorkProcessorAdvice(camelContext, processor, route); + } + return processor; + } + } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MDCErrorHandlerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MDCErrorHandlerTest.java new file mode 100644 index 0000000..2d8a47c --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MDCErrorHandlerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.MDC; + +public class MDCErrorHandlerTest extends ContextTestSupport { + + @Test + public void testMDC() throws Exception { + template.sendBody("direct:start", "Hello World"); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setUseMDCLogging(true); + context.setUseBreadcrumb(true); + + errorHandler(deadLetterChannel("direct:dead").onExceptionOccurred(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + var m = MDC.getCopyOfContextMap(); + Assertions.assertEquals(5, m.size()); + Assertions.assertEquals(exchange.getMessage().getHeader(Exchange.BREADCRUMB_ID), m.get("camel.breadcrumbId")); + Assertions.assertEquals("start", m.get("camel.routeId")); + } + })); + + from("direct:start").routeId("start") + .to("log:before") + .throwException(new IllegalArgumentException("Forced")); + + from("direct:dead").routeId("dead") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + var m = MDC.getCopyOfContextMap(); + Assertions.assertEquals(5, m.size()); + Assertions.assertEquals(exchange.getMessage().getHeader(Exchange.BREADCRUMB_ID), m.get("camel.breadcrumbId")); + Assertions.assertEquals("dead", m.get("camel.routeId")); + } + }) + .to("log:dead"); + } + }; + } +}