Hi Yeah we should likely have a StreamCacheHelper or introduce a copy(boolean clone) method that takes a boolean (with a better name) that indicate it does a indpendenent copy. Then we can keep the inner details how this copy does in those stream cache implementations.
The wire tap already does a copy of the stream cache today. But it likely need that clone copy. We could also make that the default. Though I think multicast eip does a copy as well but it may reuse the same underlying file, and only delete it when last exchange is done and closes it. On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <hekon...@gmail.com> wrote: > Hi, > > You can also use Wiretap's onPrepareRef option and use custom processor to > copy the content of the cached body. > > Franz, would you be so kind and create a pull request with your fix? > Somebody will review it and merge. Thanks in advance! > > Cheers! > > wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer < > emc2...@googlemail.com> napisał: > >> Hi Geert, >> >> it is a bug. You can try as a workaround to set the threshold >> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge >> number; then the body will be kept in memory. >> >> Alternatively, you can modify the code of the Camel class >> org.apache.camel.processor.WireTapProcessor. You have to modifiy the >> method configureCopyExchange in the following way: >> >> >> private Exchange configureCopyExchange(Exchange exchange) throws >> IOException { >> >> >> // must use a copy as we dont want it to cause side effects of >> the original exchange >> Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, >> false); >> >> if (copy.getIn().getBody() instanceof FileInputStreamCache) { >> //the file stream must be copied, otherwise you get errors >> because the stream file is removed when the parent route is finished >> FileInputStreamCache streamCache = (FileInputStreamCache) >> exchange.getIn().getBody(); >> CachedOutputStream cos = new CachedOutputStream(copy); >> try { >> IOHelper.copy(streamCache, cos); >> } finally { >> IOHelper.close(streamCache, cos); >> streamCache.reset(); >> } >> copy.getIn().setBody(cos.newStreamCache()); >> } >> // set MEP to InOnly as this wire tap is a fire and forget >> copy.setPattern(ExchangePattern.InOnly); >> return copy; >> } >> >> The idea behind this is to make a copy of the stream cache file, so >> that you get an additional stream cache file for the second route (in >> your case for the route "direct:x"). This second stream cache file >> will be deleted when the second route is finished. >> >> I also hope that this issue will be fixed. I am no committer so I >> cannot say when this issue will be solved; I have made contributions >> which solved a similar problem in the aggregator and splitter. >> >> I think you can open a Jira ticket with the above solution suggestion. >> >> Regards Franz >> >> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden >> <geer...@aviovision.com> wrote: >> > Hi Franz, >> > >> > is this something that will be fixed in an upcoming release? Is it a bug >> or >> > does it work as designed? >> > Can we use a workaround to avoid this behaviour, for example by not >> > deleting the temp files? >> > >> > >> > Kind regards, >> > >> > Geert >> > >> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer < >> > emc2...@googlemail.com> wrote: >> > >> >> Hello Geert, >> >> >> >> there is no solution yet for your problem. Currently the stream cache >> >> file is removed at the end of the route which created the file. In >> >> your case the stream cache file is deleted when the "direct:start" >> >> route is finished. The wire tap runs in a separate thread and >> >> therefore it can happen that it tries to read the cached file when it >> >> is already deleted, especially when you have a delay in the wiretap >> >> route ("direct:x"). >> >> >> >> >> >> Regards Franz >> >> >> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden >> >> <geer...@aviovision.com> wrote: >> >> > Hi, >> >> > >> >> > I noticed a bug where the body (StreamCache) was already removed >> before >> >> the >> >> > exchange reached the end (in the Wiretap route). >> >> > >> >> > I found the following ticket >> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code >> >> > >> >> >> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7 >> >> > but I think it still doesn't fix the Wiretap problem. >> >> > >> >> > Here you can find my test (executed on 2.15.1). If you disable the >> >> > StreamCaching or remove the delay it works, enabling it again will >> break >> >> > the test. >> >> > >> >> > ============ >> >> > import org.apache.camel.builder.RouteBuilder; >> >> > import org.apache.camel.component.mock.MockEndpoint; >> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy; >> >> > import org.apache.camel.spi.StreamCachingStrategy; >> >> > import org.apache.camel.test.junit4.CamelTestSupport; >> >> > import org.junit.Before; >> >> > import org.junit.Test; >> >> > >> >> > public class WireTapTest extends CamelTestSupport { >> >> > >> >> > private MockEndpoint y; >> >> > private MockEndpoint z; >> >> > >> >> > @Before >> >> > public void prepareEndpoints() { >> >> > y = getMockEndpoint("mock:file:y"); >> >> > z = getMockEndpoint("mock:file:z"); >> >> > } >> >> > >> >> > @Test >> >> > public void >> >> > >> >> >> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete() >> >> > throws InterruptedException { >> >> > y.expectedMessageCount(1); >> >> > z.expectedMessageCount(1); >> >> > >> >> > // test.txt should contain more than one character >> >> > template.sendBody("direct:start", >> >> > this.getClass().getResourceAsStream("/test.txt")); >> >> > >> >> > assertMockEndpointsSatisfied(); >> >> > } >> >> > >> >> > @Override >> >> > protected RouteBuilder createRouteBuilder() throws Exception { >> >> > return new RouteBuilder() { >> >> > @Override >> >> > public void configure() throws Exception { >> >> > StreamCachingStrategy streamCachingStrategy = new >> >> > DefaultStreamCachingStrategy(); >> >> > streamCachingStrategy.setSpoolThreshold(1); >> >> > context.setStreamCachingStrategy(streamCachingStrategy); >> >> > context.setStreamCaching(true); >> >> > >> >> > from("direct:start") >> >> > .wireTap("direct:x") >> >> > .to("file:y"); >> >> > >> >> > from("direct:x") >> >> > .delay(2000) >> >> > .to("file:z"); >> >> > } >> >> > }; >> >> > } >> >> > >> >> > @Override >> >> > public String isMockEndpoints() { >> >> > return "(file:z|file:y)"; >> >> > } >> >> > } >> >> > ============= >> >> > >> >> > If you run the test you can clearly see the temp file deletion >> followed >> >> by >> >> > the closed stream exception: >> >> > >> >> > Tried 1 to delete file: >> >> > >> >> >> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp >> >> > with result: true >> >> > >> >> > Cannot reset stream from file >> >> > >> >> >> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp >> >> > >> >> > I encountered the same issue during a more complex route that does >> some >> >> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1 >> so >> >> it >> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284 >> but I >> >> > need to test this. >> >> > >> >> > Kind regards, >> >> > >> >> > Geert >> >> >> -- Claus Ibsen ----------------- Red Hat, Inc. Email: cib...@redhat.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen hawtio: http://hawt.io/ fabric8: http://fabric8.io/