This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6e8e0287ab0d1cb8dd5d5030c31920b75c137d15 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Mar 26 14:26:27 2023 +0200 CAMEL-18619: Stream closed for onException with useOriginalMessage --- .../camel/catalog/models/deadLetterChannel.json | 4 +- .../camel/catalog/models/defaultErrorHandler.json | 4 +- .../catalog/models/jtaTransactionErrorHandler.json | 4 +- .../apache/camel/catalog/models/onException.json | 4 +- .../models/springTransactionErrorHandler.json | 4 +- .../apache/camel/spi/StreamCachingStrategy.java | 10 ++ .../impl/engine/DefaultStreamCachingStrategy.java | 8 +- .../camel/impl/engine/DefaultUnitOfWork.java | 6 + .../model/errorhandler/deadLetterChannel.json | 4 +- .../model/errorhandler/defaultErrorHandler.json | 4 +- .../errorhandler/jtaTransactionErrorHandler.json | 4 +- .../springTransactionErrorHandler.json | 4 +- .../org/apache/camel/model/onException.json | 4 +- .../builder/LegacyDefaultErrorHandlerBuilder.java | 8 + .../apache/camel/model/OnCompletionDefinition.java | 13 ++ .../apache/camel/model/OnExceptionDefinition.java | 8 + .../DefaultErrorHandlerDefinition.java | 16 ++ .../OnExceptionUseOriginalMessageStreamTest.java | 178 +++++++++++++++++++++ 18 files changed, 265 insertions(+), 22 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/deadLetterChannel.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/deadLetterChannel.json index f7b27f65585..dc2a0f0b286 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/deadLetterChannel.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/deadLetterChannel.json @@ -17,8 +17,8 @@ "loggerRef": { "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown [...] "onPrepareFailureRef": { "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter queue." }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/defaultErrorHandler.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/defaultErrorHandler.json index 25dda486f6a..11865c18fbe 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/defaultErrorHandler.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/defaultErrorHandler.json @@ -15,8 +15,8 @@ "loggerRef": { "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown [...] "onPrepareFailureRef": { "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter queue." }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/jtaTransactionErrorHandler.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/jtaTransactionErrorHandler.json index 85063afcbf1..79677b14277 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/jtaTransactionErrorHandler.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/jtaTransactionErrorHandler.json @@ -17,8 +17,8 @@ "loggerRef": { "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown [...] "onPrepareFailureRef": { "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter queue." }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onException.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onException.json index 52beaddbb82..1f6dbd74ee3 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onException.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/onException.json @@ -21,8 +21,8 @@ "continued": { "kind": "expression", "displayName": "Continued", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "se [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown from this [...] - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "disabled": { "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/springTransactionErrorHandler.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/springTransactionErrorHandler.json index e3792443f79..0c4999e7d93 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/springTransactionErrorHandler.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/springTransactionErrorHandler.json @@ -17,8 +17,8 @@ "loggerRef": { "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown [...] "onPrepareFailureRef": { "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter queue." }, diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index 4918f3974fa..3f27a0c2d52 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -19,6 +19,7 @@ package org.apache.camel.spi; import java.io.File; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.StaticService; import org.apache.camel.StreamCache; @@ -235,4 +236,13 @@ public interface StreamCachingStrategy extends StaticService { */ StreamCache cache(Exchange exchange); + /** + * Caches the body aas a {@link StreamCache}. + * + * @param message the message + * @return the body cached as a {@link StreamCache}, or <tt>null</tt> if not possible or no need to cache + * the body + */ + StreamCache cache(Message message); + } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java index 0e418fe9808..8e6cb33a4bb 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java @@ -214,12 +214,16 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came @Override public StreamCache cache(Exchange exchange) { - Message message = exchange.getMessage(); + return cache(exchange.getMessage()); + } + + @Override + public StreamCache cache(Message message) { StreamCache cache = null; // try convert to stream cache Object body = message.getBody(); if (body != null) { - cache = camelContext.getTypeConverter().convertTo(StreamCache.class, exchange, body); + cache = camelContext.getTypeConverter().convertTo(StreamCache.class, message.getExchange(), body); } if (cache != null) { if (LOG.isTraceEnabled()) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 58e4415b21f..4c33c7f75c1 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -32,6 +32,7 @@ import org.apache.camel.Message; import org.apache.camel.PooledExchange; import org.apache.camel.Processor; import org.apache.camel.Route; +import org.apache.camel.StreamCache; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationVetoable; @@ -101,6 +102,11 @@ public class DefaultUnitOfWork implements UnitOfWork { if (allowUseOriginalMessage) { this.originalInMessage = exchange.getIn().copy(); + // if the input body is streaming we need to cache it, so we can access the original input message + StreamCache cache = context.getStreamCachingStrategy().cache(this.originalInMessage); + if (cache != null) { + this.originalInMessage.setBody(cache); + } } // inject breadcrumb header if enabled diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/deadLetterChannel.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/deadLetterChannel.json index f7b27f65585..dc2a0f0b286 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/deadLetterChannel.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/deadLetterChannel.json @@ -17,8 +17,8 @@ "loggerRef": { "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown [...] "onPrepareFailureRef": { "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter queue." }, diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/defaultErrorHandler.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/defaultErrorHandler.json index 25dda486f6a..11865c18fbe 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/defaultErrorHandler.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/defaultErrorHandler.json @@ -15,8 +15,8 @@ "loggerRef": { "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown [...] "onPrepareFailureRef": { "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter queue." }, diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/jtaTransactionErrorHandler.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/jtaTransactionErrorHandler.json index 85063afcbf1..79677b14277 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/jtaTransactionErrorHandler.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/jtaTransactionErrorHandler.json @@ -17,8 +17,8 @@ "loggerRef": { "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown [...] "onPrepareFailureRef": { "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter queue." }, diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/springTransactionErrorHandler.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/springTransactionErrorHandler.json index e3792443f79..0c4999e7d93 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/springTransactionErrorHandler.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/errorhandler/springTransactionErrorHandler.json @@ -17,8 +17,8 @@ "loggerRef": { "kind": "attribute", "displayName": "Logger Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "References to a logger to use as logger for the error handler" }, "level": { "kind": "attribute", "displayName": "Level", "label": "advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR", "description": "Logging level to use when using the logging error handler type." }, "logName": { "kind": "attribute", "displayName": "Log Name", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name of the logger to use for the logging error handler" }, - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown [...] "onPrepareFailureRef": { "kind": "attribute", "displayName": "On Prepare Failure Ref", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.Processor", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor to prepare the org.apache.camel.Exchange before handled by the failure processor \/ dead letter channel. This allows for example to enrich the message before sending to a dead letter queue." }, diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/onException.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/onException.json index 52beaddbb82..1f6dbd74ee3 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/onException.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/onException.json @@ -21,8 +21,8 @@ "continued": { "kind": "expression", "displayName": "Continued", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "se [...] "onRedeliveryRef": { "kind": "attribute", "displayName": "On Redelivery Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed before a redelivery attempt. Can be used to change the org.apache.camel.Exchange before its being redelivered." }, "onExceptionOccurredRef": { "kind": "attribute", "displayName": "On Exception Occurred Ref", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a reference to a processor that should be processed just after an exception occurred. Can be used to perform custom logging about the occurred exception at the exact time it happened. Important: Any exception thrown from this [...] - "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] - "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] + "useOriginalMessage": { "kind": "attribute", "displayName": "Use Original Message", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message (original body and headers) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have fail [...] + "useOriginalBody": { "kind": "attribute", "displayName": "Use Original Body", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Will use the original input org.apache.camel.Message body (original body only) when an org.apache.camel.Exchange is moved to the dead letter queue. Notice: this only applies when all redeliveries attempt have failed and t [...] "disabled": { "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/LegacyDefaultErrorHandlerBuilder.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/LegacyDefaultErrorHandlerBuilder.java index d8888ec2f4a..40795a73a4c 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/builder/LegacyDefaultErrorHandlerBuilder.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/LegacyDefaultErrorHandlerBuilder.java @@ -348,6 +348,10 @@ public class LegacyDefaultErrorHandlerBuilder extends LegacyErrorHandlerBuilderS * with custom headers and include the original message body. The former wont let you do this, as its using the * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints @@ -385,6 +389,10 @@ public class LegacyDefaultErrorHandlerBuilder extends LegacyErrorHandlerBuilderS * with custom headers and include the original message body. The former wont let you do this, as its using the * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java index 729468dec22..b0c9cf6c4c1 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnCompletionDefinition.java @@ -214,6 +214,19 @@ public class OnCompletionDefinition extends OutputDefinition<OnCompletionDefinit /** * Will use the original input message body when an {@link org.apache.camel.Exchange} for this on completion. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> + * <b>Important:</b> The original input means the input message that are bounded by the current + * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they + * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints + * such as JMS or HTTP then the consumer will create a new unit of work, with the message it received as input as + * the original input. Also some EIP patterns such as splitter, multicast, will create a new unit of work boundary + * for the messages in their sub-route (eg the split message); however these EIPs have an option named + * <tt>shareUnitOfWork</tt> which allows to combine with the parent unit of work in regard to error handling and + * therefore use the parent original message. + * <p/> * By default this feature is off. * * @return the builder diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java index d2abb114fbd..ead6899aab0 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/OnExceptionDefinition.java @@ -668,6 +668,10 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio * with custom headers and include the original message body. The former wont let you do this, as its using the * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints @@ -705,6 +709,10 @@ public class OnExceptionDefinition extends OutputDefinition<OnExceptionDefinitio * with custom headers and include the original message body. The former wont let you do this, as its using the * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java index 1b541c807e2..c8520cbe5f4 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerDefinition.java @@ -198,6 +198,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition { * with custom headers and include the original message body. The former wont let you do this, as its using the * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints @@ -235,6 +239,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition { * with custom headers and include the original message body. The former wont let you do this, as its using the * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints @@ -682,6 +690,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition { * with custom headers and include the original message body. The former wont let you do this, as its using the * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints @@ -719,6 +731,10 @@ public class DefaultErrorHandlerDefinition extends BaseErrorHandlerDefinition { * with custom headers and include the original message body. The former wont let you do this, as its using the * original message body and headers as they are. You cannot enable both useOriginalMessage and useOriginalBody. * <p/> + * The original input message is defensively copied, and the copied message body is converted to {@link org.apache.camel.StreamCache} + * if possible, to ensure the body can be read when the original message is being used later. If the body is not + * converted to {@link org.apache.camel.StreamCache} then the body will not be able to re-read when accessed later. + * <p/> * <b>Important:</b> The original input means the input message that are bounded by the current * {@link org.apache.camel.spi.UnitOfWork}. An unit of work typically spans one route, or multiple routes if they * are connected using internal endpoints such as direct or seda. When messages is passed via external endpoints diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTest.java new file mode 100644 index 00000000000..60c0ca60164 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionUseOriginalMessageStreamTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.onexception; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.converter.IOConverter; +import org.apache.camel.spi.DataFormat; +import org.apache.camel.support.service.ServiceSupport; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OnExceptionUseOriginalMessageStreamTest extends ContextTestSupport { + + @Test + void convertBodyWithStreamCache() { + // Cached stream is closed by explicit type converter + String data = "data"; + InputStream is = new ByteArrayInputStream(data.getBytes()); + Object out = template.requestBody("direct:convertBodyWithStreamCache", is, Object.class); + Assertions.assertEquals(data, out); + } + + @Test + void convertBodyWithoutStreamCache() { + // Uncached stream is closed by reading with type converter + String data = "data"; + InputStream is = new ByteArrayInputStream(data.getBytes()); + Object out = template.requestBody("direct:convertBodyWithoutStreamCache", is, Object.class); + Assertions.assertEquals(data, out); + } + + @Test + void unmarshallWithStreamCache() { + // Cached stream is closed unmarshalling + String data = """ + {"test": "data"} + """; + InputStream is = new ByteArrayInputStream(data.getBytes()); + Object out = template.requestBody("direct:unmarshallWithStreamCache", is, Object.class); + Assertions.assertEquals(data, out); + } + + @Test + void unmarshallWithoutStreamCache() { + // Uncached stream is closed by reading with unmarshaller + String data = """ + {"test": "data"} + """; + InputStream is = new ByteArrayInputStream(data.getBytes()); + Object out = template.requestBody("direct:unmarshallWithoutStreamCache", is, Object.class); + Assertions.assertEquals(data, out); + } + + @Test + void unmarshallInvalidWithoutStreamCache() { + // Uncached stream is closed by reading with unmarshaller + String data = """ + {"test": "data + """; + InputStream is = new ByteArrayInputStream(data.getBytes()); + Object out = template.requestBody("direct:convertBodyInvalidUnmarshallWithoutStreamCache", is, Object.class); + Assertions.assertEquals(data, out); + } + + @Test + void noStreamReading() { + // Both cached and uncached streams are available because if it is not used + String data = "data"; + InputStream is = new ByteArrayInputStream(data.getBytes()); + Object out = template.requestBody("direct:noStreamReading", is, Object.class); + Assertions.assertEquals(data, out); + } + + @Test + void setBodyAsExchangeProperty() { + // Data is converted to string and put in exchange property + String data = "data"; + InputStream is = new ByteArrayInputStream(data.getBytes()); + Object out = template.requestBody("direct:setBodyAsExchangeProperty", is, Object.class); + Assertions.assertEquals(data, out); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(ExceptionOne.class, MyDataFormatException.class) + .useOriginalMessage() + .convertBodyTo(String.class) + .handled(true); + + onException(ExceptionTwo.class) + .setBody(exchangeProperty("OriginalBody")) + .convertBodyTo(String.class) + .handled(true); + + from("direct:convertBodyWithStreamCache").streamCaching() + .convertBodyTo(String.class) + .throwException(new ExceptionOne()); + + from("direct:convertBodyWithoutStreamCache").noStreamCaching() + .convertBodyTo(String.class) + .throwException(new ExceptionOne()); + + from("direct:unmarshallWithStreamCache").streamCaching() + .unmarshal(new MyDataFormat()) + .throwException(new ExceptionOne()); + + from("direct:unmarshallWithoutStreamCache").noStreamCaching() + .unmarshal(new MyDataFormat()) + .throwException(new ExceptionOne()); + + from("direct:convertBodyInvalidUnmarshallWithoutStreamCache").noStreamCaching() + .convertBodyTo(String.class) + .unmarshal(new MyDataFormat()); + + from("direct:noStreamReading").streamCaching() + .throwException(new ExceptionOne()); + + from("direct:setBodyAsExchangeProperty").noStreamCaching() + .convertBodyTo(String.class) + .setProperty("OriginalBody", body()) + .throwException(new ExceptionTwo()); + } + }; + } + + public class ExceptionOne extends Exception { + + } + + public class ExceptionTwo extends Exception { + + } + + public class MyDataFormatException extends Exception { + + public MyDataFormatException(String message) { + super(message); + } + } + + public class MyDataFormat extends ServiceSupport implements DataFormat { + + @Override + public void marshal(Exchange exchange, Object graph, OutputStream stream) throws Exception { + // noop + } + + @Override + public Object unmarshal(Exchange exchange, InputStream stream) throws Exception { + // simulate reading the entire stream so its not re-readable later + String s = IOConverter.toString(stream, exchange); + throw new MyDataFormatException(s); + } + } +}