Mybeans.xml <http://camel.465427.n5.nabble.com/file/n5755810/Mybeans.xml>
Hi, We are using camel 2.13.2 - I have a multicast route with an AggregationStrategy. And in each multicast branch, we have a custom camel component that returns huge data (around 4 MB) and writes to StreamCache (CachedOutputStream) and we need to aggregate the data in the multicast (AggregationStrrategy). In the Aggregation strategy, I need to do XPath evaluation using camel XPathBuilder. Hence, I try to read the body and convert from StreamCache to byte[] to avoid 'Error during type conversion from type: org.apache.camel.converter.stream.InputStreamCache.' in the XPathBuilder. When I try to read the body in the beginning of the AggregationStrategy, I get the following error. */tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp (No such file or directory), cause: FileNotFoundException:/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp (No such file or directory). at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.camel.converter.stream.FileInputStreamCache.createInputStream(FileInputStreamCache.java:123)* at org.apache.camel.converter.stream.FileInputStreamCache.getInputStream(FileInputStreamCache.java:117) at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:93) at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102) at com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToByteArray(MergeAtXPathAggregationStrategy.java:169) at com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToXpathCompatibleType(MergeAtXPathAggregationStrategy.java:161) Following is the line of code where it is throwing an error: Object body = exchange.getIn().getBody(); if( body instanceof StreamCache){ StreamCache cache = (StreamCache)body; xml = new String(*convertToByteArray(cache,exchange));* exchange.getIn().setBody(xml); } By disabling stream cache to write to file by setting a threshold of 10MB in multicast related routes, we were able to work with the aggregation strategy. But we do not want to do that, as we may have incoming data that maybe bigger. <camel:camelContext id="multicast_xml_1" streamCache="true"> <camel:properties> <camel:property key="CamelCachedOutputStreamCipherTransformation" value="RC4"/> <camel:property key="CamelCachedOutputStreamThreshold" value="100000000"/> </camel:properties> Note: The FileNotFound issue does not appear if we have the *StreamCache based camel component* in the route with other processors, *but without Multicast + Aggregation *. Can you please let us know why the streamcache related temporary file gets deleted before the aggregation of the branch exchange? After debugging, I could understand the issue with aggregating huge data from StreamCache with MulticastProcessor. In MulticastProcessor.java: doProcessParallel() is called and on completion of the branch exchange of multicast, the CachedOutputStream deletes / cleans up the temporary file. This happens even before the multicast branch exchange reaches the aggregation Strategy, which tries to read the data from the branch exchange. In case of huge data in StreamCache, the temporary file is already deleted, leading to FileNotFound issues. public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) { this.strategy = exchange.getContext().getStreamCachingStrategy(); currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize()); if (closedOnCompletion) { // add on completion so we can cleanup after the exchange is done such as deleting temporary files exchange.addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { try { if (fileInputStreamCache != null) { fileInputStreamCache.close(); } * close();* } catch (Exception e) { LOG.warn("Error deleting temporary cache file: " + tempFile, e); } } @Override public String toString() { return "OnCompletion[CachedOutputStream]"; } }); } } * public void close() throws IOException { currentStream.close(); cleanUpTempFile(); }* <http://camel.465427.n5.nabble.com/file/n5756005/StreamCache_File_Gets_Deleted_before_Aggregation.png> I was able to circumvent the issue, if I try to set *closedOnCompletion= false, while writing to CachedOutputStream in any component in any Multicast branch.* But this is a leaky solution, because the streamcache temporary file(s) may then never get cleaned up. Can the MulticastProcessor be adjusted so that the multicast branch exchanges reach 'completion' status only, after they have been aggregated at the end of multicast? Please help / advise on the issue and help us resolve the issue Thanks, Lakshmi -- View this message in context: http://camel.465427.n5.nabble.com/Stream-Cache-file-deletion-before-aggregation-in-Multicast-involving-huge-data-tp5756005.html Sent from the Camel Development mailing list archive at Nabble.com.