Hello,

I have a very simple example which shows that the problem does not
only occur for MultiCast. The problem always occurs when in a
sub-route the message body is streamed via CachedOutputStream to the
file system and the cached file is deleted at the end of the
soub-route, although the main-route needs access to the result body of
the sub-route.

public class StreamCacheInSubRouteTest extends ContextTestSupport {

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
   @Override
   public void configure() throws Exception {
context.setStreamCaching(true);
context.getStreamCachingStrategy().setEnabled(true);
context.getStreamCachingStrategy().setSpoolDirectory(
"target/camel/cache");
context.getStreamCachingStrategy().setSpoolThreshold(1l);

from("direct:startError")
.to("direct:subroute").to("mock:result");

from("direct:subroute") //
.process(new Processor() {
   @Override
   public void process(Exchange exchange) throws Exception {
CachedOutputStream cos = new CachedOutputStream(exchange);
   String s = "Test Message 1";
   cos.write(s.getBytes(Charset.forName("UTF-8")));
   cos.close();
   InputStream is = (InputStream) cos.newStreamCache();

   exchange.getOut().setBody(is);
   }
}) //
.to("mock:result_subroute");
   }

};

    }

    @Test
    public void test() throws Exception {

MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
template.sendBody("direct:startError", "<start></start>");

byte[] data = mock.getReceivedExchanges().get(0).getIn()
.getBody(byte[].class);
assertEquals("Test Message 1", new String(data, "UTF-8"));
    }

}

I will add this example to the jira ticket mentioned by Claus.

Regards Franz

On Fri, Sep 5, 2014 at 1:23 PM, lakshmi.prashant
<lakshmi.prash...@gmail.com> wrote:
> Hi,
>
> Mybeans.xml <http://camel.465427.n5.nabble.com/file/n5756092/Mybeans.xml>
>
> *Issue:    *
>
>   Whenever data is spooled in file via CachedOutputStream in any camel
> component in a multicast branch, that data becomes unreadable in
>
> a) Aggregation Strategy of Multicast
> b) After multicast, in case there is no aggregation strategy
>
> We are getting:
>
> a) FileNotFound issues as the file is deleted on completion of the cloned
> branch exchange.
> b) Premature end of file, when we read data from InputStream and use
> XMLReader / STAX to read the data.
>
> If we use the Constructor: new CachedOutputStream(exchange, false), the
> streamcache file will not be deleted.
> But the file may never be cleaned up & we do not want to do that.
>
>
> *Details*
>
>  We are using camel 2.13.2  - I have a multicast route with an
> AggregationStrategy.
>  And in each multicast branch, we have a custom camel component that returns
> huge data (around 4 MB) and writes to StreamCache (CachedOutputStream) and
> we need to aggregate the data in the multicast (AggregationStrrategy).
>
>
>   In the Aggregation strategy, I need to do XPath evaluation using camel
> XPathBuilder.
>   Hence, I try to read the body and convert from StreamCache to byte[] to
> avoid 'Error during type conversion from type:
> org.apache.camel.converter.stream.InputStreamCache.' in the XPathBuilder.
>
> When I try to read the body in the beginning of the AggregationStrategy, I
> get the following error.
> *
> /tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp
> (No such file or directory), cause:
> FileNotFoundException:/tmp/camel/camel-tmp-4e00bf8a-4a42-463a-b046-5ea2d7fc8161/cos6047774870387520936.tmp
> (No such file or directory).
>         at java.io.FileInputStream.open(Native Method)
>         at java.io.FileInputStream.<init>(FileInputStream.java:138)
>         at
> org.apache.camel.converter.stream.FileInputStreamCache.createInputStream(FileInputStreamCache.java:123)
> at
> org.apache.camel.converter.stream.FileInputStreamCache.getInputStream(FileInputStreamCache.java:117)
>         at
> org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:93)
>         at
> org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)
>         at
> com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToByteArray(MergeAtXPathAggregationStrategy.java:169)
>         at
> com.sap.it.rt.camel.aggregate.strategies.MergeAtXPathAggregationStrategy.convertToXpathCompatibleType(MergeAtXPathAggregationStrategy.java:161)
>
> Following is the line of code where it is throwing an error:
>
>                             Object body = exchange.getIn().getBody();
>                 if( body instanceof StreamCache){
>                         StreamCache cache = (StreamCache)body;
>                         xml = new
> String(convertToByteArray(cache,exchange));
> exchange.getIn().setBody(xml);
>                 }
> *
>
>
>
> By disabling stream cache to write to file by setting a threshold of 10MB in
> multicast related routes,  we were able to work with the aggregation
> strategy. But we do not want to do that, as we may have incoming data that
> maybe bigger.
>
> <camel:camelContext id="multicast_xml_1" streamCache="true">
> <camel:properties>
> <camel:property key="CamelCachedOutputStreamCipherTransformation"
> value="RC4"/>
> <camel:property key=&quot;CamelCachedOutputStreamThreshold&quot;
> value=&quot;&lt;b>100000000*"/>
> </camel:properties>
>
> Note: The FileNotFound issue does not appear if we have the StreamCache
> based camel component in the route with other processors, but without
> Multicast + Aggregation .
>
> After debugging, I could understand the issue with aggregating huge data
> from StreamCache with MulticastProcessor.
>
> In MulticastProcessor.java: doProcessParallel() is called and on completion
> of the branch exchange of multicast, the CachedOutputStream deletes / cleans
> up the temporary file.
>
>  This happens even before the multicast branch exchange reaches the
> aggregation Strategy, which tries to read the data from the branch exchange.
> In case of huge data in StreamCache, the temporary file is already deleted,
> leading to FileNotFound issues.
>
>     *
>     public CachedOutputStream(Exchange exchange, boolean closedOnCompletion)
> {
>         this.strategy = exchange.getContext().getStreamCachingStrategy();
>         currentStream = new
> CachedByteArrayOutputStream(strategy.getBufferSize());
>
>         if (closedOnCompletion) {
>             // add on completion so we can cleanup after the exchange is
> done such as deleting temporary files
>             exchange.addOnCompletion(new SynchronizationAdapter() {
>                 @Override
>                 public void onDone(Exchange exchange) {
>                     try {
>                         if (fileInputStreamCache != null) {
>                             fileInputStreamCache.close();
>                         }
>                         close();                    } catch (Exception e) {
>                         LOG.warn("Error deleting temporary cache file: " +
> tempFile, e);
>                     }
>                 }
>
>                 @Override
>                 public String toString() {
>                     return "OnCompletion[CachedOutputStream]";
>                 }
>             });
>         }
>     }
>
>    public void close() throws IOException {
>         currentStream.close();
>         cleanUpTempFile();
>     }
>
> *
> <http://camel.465427.n5.nabble.com/file/n5756092/StreamCache_File_Gets_Deleted_before_Aggregation.png>
>
> I was able to circumvent the issue, if I try to set closedOnCompletion=
> false, while writing to CachedOutputStream in any component in any Multicast
> branch.
> But this is a leaky solution, because the streamcache temporary file(s) may
> then never get cleaned up.
>
> Can the MulticastProcessor be adjusted so that the multicast branch
> exchanges reach 'completion' status only, after they have been aggregated at
> the end of multicast?
>
> Please help / advise on the issue, as I am new to using camel Multicast.
>
> Thanks,
> Lakshmi
>
>
>
> --
> View this message in context: 
> http://camel.465427.n5.nabble.com/Stream-Cache-spool-file-deletion-before-aggregation-in-Multicast-involving-huge-data-tp5756092.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to