Author: gertv Date: Tue Feb 3 10:15:34 2009 New Revision: 740263 URL: http://svn.apache.org/viewvc?rev=740263&view=rev Log: Merged revisions 740251 via svnmerge from https://svn.eu.apache.org/repos/asf/camel/trunk
........ r740251 | gertv | 2009-02-03 10:16:53 +0100 (Tue, 03 Feb 2009) | 1 line CAMEL-1271/CAMEL-520: Enable stream caching as an InterceptStrategy ........ Added: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java - copied, changed from r740251, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (with props) camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java - copied unchanged from r740251, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingTest.java camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java - copied unchanged from r740251, camel/trunk/camel-core/src/test/java/org/apache/camel/util/MessageHelperTest.java Modified: camel/branches/camel-1.x/ (props changed) camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Propchange: camel/branches/camel-1.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Feb 3 10:15:34 2009 @@ -1 +1 @@ -/camel/trunk:739733,739904 +/camel/trunk:739733,739904,740251 Propchange: camel/branches/camel-1.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=740263&r1=740262&r2=740263&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (original) +++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Tue Feb 3 10:15:34 2009 @@ -25,6 +25,7 @@ import org.apache.camel.processor.RecipientList; import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; +import org.apache.camel.processor.interceptor.StreamCaching; import org.apache.camel.spi.RouteContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,6 +67,7 @@ public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { Processor deadLetter = getDeadLetterFactory().createProcessor(); DeadLetterChannel answer = new DeadLetterChannel(processor, deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy()); + StreamCaching.enable(routeContext); configure(answer); return answer; } Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=740263&r1=740262&r2=740263&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original) +++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Feb 3 10:15:34 2009 @@ -31,6 +31,7 @@ import org.apache.camel.model.LoggingLevel; import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.MessageHelper; import org.apache.camel.util.ServiceHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -174,6 +175,9 @@ exchange.setException(null); } + // reset cached streams so they can be read again + MessageHelper.resetStreamCache(exchange.getIn()); + // wait until we should redeliver data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); Copied: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (from r740251, camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java) URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java?p2=camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java&r1=740251&r2=740263&rev=740263&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java (original) +++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCaching.java Tue Feb 3 10:15:34 2009 @@ -24,7 +24,7 @@ /** * {...@link InterceptStrategy} implementation to configure stream caching on a RouteContext */ -public class StreamCaching implements InterceptStrategy { +public final class StreamCaching implements InterceptStrategy { /* * Hide constructor -- instances will be created through static enable() methods Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=740263&r1=740262&r2=740263&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original) +++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Tue Feb 3 10:15:34 2009 @@ -25,6 +25,7 @@ import org.apache.camel.model.InterceptorRef; import org.apache.camel.model.InterceptorType; import org.apache.camel.processor.Interceptor; +import org.apache.camel.util.MessageHelper; /** * {...@link Interceptor} that converts a message into a re-readable format @@ -38,9 +39,9 @@ try { StreamCache newBody = exchange.getIn().getBody(StreamCache.class); if (newBody != null) { - newBody.reset(); exchange.getIn().setBody(newBody); } + MessageHelper.resetStreamCache(exchange.getIn()); } catch (NoTypeConversionAvailableException ex) { // ignore if in is not of StreamCache type } Added: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java?rev=740263&view=auto ============================================================================== --- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java (added) +++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java Tue Feb 3 10:15:34 2009 @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +import org.apache.camel.Message; +import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.converter.stream.StreamCache; + +/** + * Some helper methods when working with {...@link org.apache.camel.Message}. + * + * @version $Revision: 740251 $ + */ +public final class MessageHelper { + + /** + * Utility classes should not have a public constructor. + */ + private MessageHelper() { + } + + /** + * Extracts the given body and returns it as a String, that + * can be used for logging etc. + * <p/> + * Will handle stream based bodies wrapped in StreamCache. + * + * @param message the message with the body + * @return the body as String, can return <tt>null</null> if no body + */ + public static String extractBodyAsString(Message message) { + if (message == null) { + return null; + } + + StreamCache newBody = null; + try { + newBody = message.getBody(StreamCache.class); + if (newBody != null) { + message.setBody(newBody); + } + } catch (NoTypeConversionAvailableException ex) { + // ignore, in not of StreamCache type + } + + Object answer; + try { + answer = message.getBody(String.class); + } catch (NoTypeConversionAvailableException ex) { + answer = message.getBody(); + } + + if (newBody != null) { + // Reset the InputStreamCache + newBody.reset(); + } + + return answer != null ? answer.toString() : null; + } + + /** + * Gets the given body class type name as a String. + * <p/> + * Will skip java.lang. for the build in Java types. + * + * @param message the message with the body + * @return the body typename as String, can return <tt>null</null> if no body + */ + public static String getBodyTypeName(Message message) { + if (message == null) { + return null; + } + String answer = ObjectHelper.classCanonicalName(message.getBody()); + if (answer != null && answer.startsWith("java.lang.")) { + return answer.substring(10); + } + return answer; + } + + /** + * If the message body contains a {...@link StreamCache} instance, reset the cache to + * enable reading from it again. + * + * @param message the message for which to reset the body + */ + public static void resetStreamCache(Message message) { + if (message == null) { + return; + } + if (message.getBody() instanceof StreamCache) { + ((StreamCache) message.getBody()).reset(); + } + } +} Propchange: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java?rev=740263&r1=740262&r2=740263&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java (original) +++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetSedaTest.java Tue Feb 3 10:15:34 2009 @@ -46,6 +46,9 @@ protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { + //TODO: remove this once the delegate processor supports async + errorHandler(noErrorHandler()); + from("dataset:foo").to("seda:queue:test?size=100"); from("seda:queue:test?size=100").to("dataset:foo"); } Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java?rev=740263&r1=740262&r2=740263&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java (original) +++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/issues/CacheInputStreamInDeadLetterIssue520Test.java Tue Feb 3 10:15:34 2009 @@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.io.Reader; import java.io.StringReader; import javax.xml.transform.stream.StreamSource; @@ -68,23 +69,22 @@ protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - streamCaching(); errorHandler(deadLetterChannel("direct:errorHandler").maximumRedeliveries(3)); from("direct:start").process(new Processor() { public void process(Exchange exchange) throws Exception { count++; // Read the in stream from cache String result = exchange.getIn().getBody(String.class); - assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>"); + assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result); throw new Exception("Forced exception by unit test"); } }); //Need to set the streamCaching for the deadLetterChannel - from("direct:errorHandler").streamCaching().process(new Processor() { + from("direct:errorHandler").process(new Processor() { public void process(Exchange exchange) throws Exception { String result = exchange.getIn().getBody(String.class); - assertEquals("Should read the inputstream out again", result, "<hello>Willem</hello>"); + assertEquals("Should read the inputstream out again", "<hello>Willem</hello>", result); } }).to("mock:error"); Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=740263&r1=740262&r2=740263&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original) +++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Tue Feb 3 10:15:34 2009 @@ -104,8 +104,12 @@ outputProcessor = interceptor.getProcessor(); } + // we are not interested in any other delegate processors in the route (e.g. stream caching) + while (outputProcessor instanceof DelegateProcessor) { + outputProcessor = ((DelegateProcessor) outputProcessor).getProcessor(); + } + assertIsInstanceOf(StreamResequencer.class, outputProcessor); } - } Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java?rev=740263&r1=740262&r2=740263&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java (original) +++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/StreamSourceContentBasedRouterTest.java Tue Feb 3 10:15:34 2009 @@ -37,7 +37,7 @@ public void testSendStreamSource() throws Exception { x.expectedMessageCount(1); y.expectedMessageCount(1); - + sendBody("direct:start", new StreamSource(new StringReader("<message>xx</message>"))); sendBody("direct:start", new StreamSource(new StringReader("<message>yy</message>"))); @@ -65,7 +65,7 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - from("direct:start").convertBodyTo(String.class).choice() + from("direct:start").choice() .when().xpath("/message/text() = 'xx'").to("mock:x") .when().xpath("/message/text() = 'yy'").to("mock:y"); } Modified: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=740263&r1=740262&r2=740263&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original) +++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Tue Feb 3 10:15:34 2009 @@ -20,12 +20,16 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.model.ProcessorType; +import org.apache.camel.processor.interceptor.Tracer; +import org.apache.camel.spi.InterceptStrategy; /** * @version $Revision$ @@ -141,7 +145,8 @@ protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - inheritErrorHandler(false); + //TODO: revert this once we get DelegateProcessor to support async + setErrorHandlerBuilder(noErrorHandler()); // START SNIPPET: example from("direct:a").thread(1).process(new Processor() {