Hello, I have a very simple example which shows that the problem does not only occur for MultiCast. The problem always occurs when in a sub-route the message body is streamed via CachedOutputStream to the file system and the cached file is deleted at the end of the soub-route, although the main-route needs access to the result body of the sub-route.
public class StreamCacheInSubRouteTest extends ContextTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { context.setStreamCaching(true); context.getStreamCachingStrategy().setEnabled(true); context.getStreamCachingStrategy().setSpoolDirectory( "target/camel/cache"); context.getStreamCachingStrategy().setSpoolThreshold(1l); from("direct:startError") .to("direct:subroute").to("mock:result"); from("direct:subroute") // .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { CachedOutputStream cos = new CachedOutputStream(exchange); String s = "Test Message 1"; cos.write(s.getBytes(Charset.forName("UTF-8"))); cos.close(); InputStream is = (InputStream) cos.newStreamCache(); exchange.getOut().setBody(is); } }) // .to("mock:result_subroute"); } }; } @Test public void test() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); template.sendBody("direct:startError", "<start></start>"); byte[] data = mock.getReceivedExchanges().get(0).getIn() .getBody(byte[].class); assertEquals("Test Message 1", new String(data, "UTF-8")); } } I will add this example to the jira ticket mentioned by Claus. Regards Franz On Fri, Sep 5, 2014 at 1:23 PM, lakshmi.prashant <lakshmi.prash...@gmail.com> wrote: > Hi, > > Mybeans.xml <http://camel.465427.n5.nabble.com/file/n5756092/Mybeans.xml> > > *Issue: * > > Whenever data is spooled in file via CachedOutputStream in any camel > component in a multicast branch, that data becomes unreadable in > > a) Aggregation Strategy of Multicast > b) After multicast, in case there is no aggregation strategy > > We are getting: > > a) FileNotFound issues as the file is deleted on completion of the cloned > branch exchange. > b) Premature end of file, when we read data from InputStream and use > XMLReader / STAX to read the data. > > If we use the Constructor: new CachedOutputStream(exchange, false), the > streamcache file will not be deleted. > But the file may never be cleaned up & we do not want to do that. > > > *Details* > > We are using camel 2.13.2 - I have a multicast route with an > AggregationStrategy. > And in each multicast branch, we have a custom camel component that returns > huge data (around 4 MB) and writes to StreamCache (CachedOutputStream) and > we need to aggregate the data in the multicast (AggregationStrrategy). > > > In the Aggregation strategy, I need to do XPath evaluation using camel > XPathBuilder. > Hence, I try to read the body and convert from StreamCache to byte[] to > avoid 'Error during type conversion from type: > org.apache.camel.converter.stream.InputStreamCache.' in the XPathBuilder. > > When I try to read the body in the beginning of the AggregationStrategy, I > get the following error. > * > /tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp > (No such file or directory), cause: > FileNotFoundException:/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp > (No such file or directory). > at java.io.FileInputStream.open(Native Method) > at java.io.FileInputStream.<init>(FileInputStream.java:138) > at > org.apache.camel.converter.stream.FileInputStreamCache.createInputStream(FileInputStreamCache.java:123) > at > org.apache.camel.converter.stream.FileInputStreamCache.getInputStream(FileInputStreamCache.java:117) > at > org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:93) > at > org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102) > at > com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToByteArray(MergeAtXPathAggregationStrategy.java:169) > at > com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToXpathCompatibleType(MergeAtXPathAggregationStrategy.java:161) > > Following is the line of code where it is throwing an error: > > Object body = exchange.getIn().getBody(); > if( body instanceof StreamCache){ > StreamCache cache = (StreamCache)body; > xml = new > String(convertToByteArray(cache,exchange)); > exchange.getIn().setBody(xml); > } > * > > > > By disabling stream cache to write to file by setting a threshold of 10MB in > multicast related routes, we were able to work with the aggregation > strategy. But we do not want to do that, as we may have incoming data that > maybe bigger. > > <camel:camelContext id="multicast_xml_1" streamCache="true"> > <camel:properties> > <camel:property key="CamelCachedOutputStreamCipherTransformation" > value="RC4"/> > <camel:property key="CamelCachedOutputStreamThreshold" > value="<b>100000000*"/> > </camel:properties> > > Note: The FileNotFound issue does not appear if we have the StreamCache > based camel component in the route with other processors, but without > Multicast + Aggregation . > > After debugging, I could understand the issue with aggregating huge data > from StreamCache with MulticastProcessor. > > In MulticastProcessor.java: doProcessParallel() is called and on completion > of the branch exchange of multicast, the CachedOutputStream deletes / cleans > up the temporary file. > > This happens even before the multicast branch exchange reaches the > aggregation Strategy, which tries to read the data from the branch exchange. > In case of huge data in StreamCache, the temporary file is already deleted, > leading to FileNotFound issues. > > * > public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) > { > this.strategy = exchange.getContext().getStreamCachingStrategy(); > currentStream = new > CachedByteArrayOutputStream(strategy.getBufferSize()); > > if (closedOnCompletion) { > // add on completion so we can cleanup after the exchange is > done such as deleting temporary files > exchange.addOnCompletion(new SynchronizationAdapter() { > @Override > public void onDone(Exchange exchange) { > try { > if (fileInputStreamCache != null) { > fileInputStreamCache.close(); > } > close(); } catch (Exception e) { > LOG.warn("Error deleting temporary cache file: " + > tempFile, e); > } > } > > @Override > public String toString() { > return "OnCompletion[CachedOutputStream]"; > } > }); > } > } > > public void close() throws IOException { > currentStream.close(); > cleanUpTempFile(); > } > > * > <http://camel.465427.n5.nabble.com/file/n5756092/StreamCache_File_Gets_Deleted_before_Aggregation.png> > > I was able to circumvent the issue, if I try to set closedOnCompletion= > false, while writing to CachedOutputStream in any component in any Multicast > branch. > But this is a leaky solution, because the streamcache temporary file(s) may > then never get cleaned up. > > Can the MulticastProcessor be adjusted so that the multicast branch > exchanges reach 'completion' status only, after they have been aggregated at > the end of multicast? > > Please help / advise on the issue, as I am new to using camel Multicast. > > Thanks, > Lakshmi > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Stream-Cache-spool-file-deletion-before-aggregation-in-Multicast-involving-huge-data-tp5756092.html > Sent from the Camel - Users mailing list archive at Nabble.com.