On Wed, Apr 22, 2015 at 10:17 AM, Franz Paul Forsthofer <emc2...@googlemail.com> wrote: > Hello Claus and Henryk, > > my original proposal to copy the stream cache file is not the optimal > solution. A better solution would be to have only one stream cache > file and to delete this file only when all exchanges which need this > file are done. This does mean we have to register listeners to the > event onDone of all UnitOfWorks of the relevant exchanges in the > stream cache file object and only when all listeners have got the > onDone event, then the file can be deleted. However this will require > quite some changes > > Probably we could also use this solution for the agregator and splitter case.. >
Yeah though that entails the stream cache needs to keep this state and be tailored to not only 1 exchange but to potentially N+ exchanges. But a good idea nevertheless. Especially for big streams. Logging a JIRA ticket is welcome. > Regards Franz > > On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <claus.ib...@gmail.com> wrote: >> Hi >> >> Yeah we should likely have a StreamCacheHelper or introduce a >> copy(boolean clone) method that takes a boolean (with a better name) >> that indicate it does a indpendenent copy. Then we can keep the inner >> details how this copy does in those stream cache implementations. >> >> The wire tap already does a copy of the stream cache today. But it >> likely need that clone copy. We could also make that the default. >> Though I think multicast eip does a copy as well but it may reuse the >> same underlying file, and only delete it when last exchange is done >> and closes it. >> >> >> >> On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <hekon...@gmail.com> wrote: >>> Hi, >>> >>> You can also use Wiretap's onPrepareRef option and use custom processor to >>> copy the content of the cached body. >>> >>> Franz, would you be so kind and create a pull request with your fix? >>> Somebody will review it and merge. Thanks in advance! >>> >>> Cheers! >>> >>> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer < >>> emc2...@googlemail.com> napisał: >>> >>>> Hi Geert, >>>> >>>> it is a bug. You can try as a workaround to set the threshold >>>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge >>>> number; then the body will be kept in memory. >>>> >>>> Alternatively, you can modify the code of the Camel class >>>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the >>>> method configureCopyExchange in the following way: >>>> >>>> >>>> private Exchange configureCopyExchange(Exchange exchange) throws >>>> IOException { >>>> >>>> >>>> // must use a copy as we dont want it to cause side effects of >>>> the original exchange >>>> Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, >>>> false); >>>> >>>> if (copy.getIn().getBody() instanceof FileInputStreamCache) { >>>> //the file stream must be copied, otherwise you get errors >>>> because the stream file is removed when the parent route is finished >>>> FileInputStreamCache streamCache = (FileInputStreamCache) >>>> exchange.getIn().getBody(); >>>> CachedOutputStream cos = new CachedOutputStream(copy); >>>> try { >>>> IOHelper.copy(streamCache, cos); >>>> } finally { >>>> IOHelper.close(streamCache, cos); >>>> streamCache.reset(); >>>> } >>>> copy.getIn().setBody(cos.newStreamCache()); >>>> } >>>> // set MEP to InOnly as this wire tap is a fire and forget >>>> copy.setPattern(ExchangePattern.InOnly); >>>> return copy; >>>> } >>>> >>>> The idea behind this is to make a copy of the stream cache file, so >>>> that you get an additional stream cache file for the second route (in >>>> your case for the route "direct:x"). This second stream cache file >>>> will be deleted when the second route is finished. >>>> >>>> I also hope that this issue will be fixed. I am no committer so I >>>> cannot say when this issue will be solved; I have made contributions >>>> which solved a similar problem in the aggregator and splitter. >>>> >>>> I think you can open a Jira ticket with the above solution suggestion. >>>> >>>> Regards Franz >>>> >>>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden >>>> <geer...@aviovision.com> wrote: >>>> > Hi Franz, >>>> > >>>> > is this something that will be fixed in an upcoming release? Is it a bug >>>> or >>>> > does it work as designed? >>>> > Can we use a workaround to avoid this behaviour, for example by not >>>> > deleting the temp files? >>>> > >>>> > >>>> > Kind regards, >>>> > >>>> > Geert >>>> > >>>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer < >>>> > emc2...@googlemail.com> wrote: >>>> > >>>> >> Hello Geert, >>>> >> >>>> >> there is no solution yet for your problem. Currently the stream cache >>>> >> file is removed at the end of the route which created the file. In >>>> >> your case the stream cache file is deleted when the "direct:start" >>>> >> route is finished. The wire tap runs in a separate thread and >>>> >> therefore it can happen that it tries to read the cached file when it >>>> >> is already deleted, especially when you have a delay in the wiretap >>>> >> route ("direct:x"). >>>> >> >>>> >> >>>> >> Regards Franz >>>> >> >>>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden >>>> >> <geer...@aviovision.com> wrote: >>>> >> > Hi, >>>> >> > >>>> >> > I noticed a bug where the body (StreamCache) was already removed >>>> before >>>> >> the >>>> >> > exchange reached the end (in the Wiretap route). >>>> >> > >>>> >> > I found the following ticket >>>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code >>>> >> > >>>> >> >>>> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7 >>>> >> > but I think it still doesn't fix the Wiretap problem. >>>> >> > >>>> >> > Here you can find my test (executed on 2.15.1). If you disable the >>>> >> > StreamCaching or remove the delay it works, enabling it again will >>>> break >>>> >> > the test. >>>> >> > >>>> >> > ============ >>>> >> > import org.apache.camel.builder.RouteBuilder; >>>> >> > import org.apache.camel.component.mock.MockEndpoint; >>>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy; >>>> >> > import org.apache.camel.spi.StreamCachingStrategy; >>>> >> > import org.apache.camel.test.junit4.CamelTestSupport; >>>> >> > import org.junit.Before; >>>> >> > import org.junit.Test; >>>> >> > >>>> >> > public class WireTapTest extends CamelTestSupport { >>>> >> > >>>> >> > private MockEndpoint y; >>>> >> > private MockEndpoint z; >>>> >> > >>>> >> > @Before >>>> >> > public void prepareEndpoints() { >>>> >> > y = getMockEndpoint("mock:file:y"); >>>> >> > z = getMockEndpoint("mock:file:z"); >>>> >> > } >>>> >> > >>>> >> > @Test >>>> >> > public void >>>> >> > >>>> >> >>>> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete() >>>> >> > throws InterruptedException { >>>> >> > y.expectedMessageCount(1); >>>> >> > z.expectedMessageCount(1); >>>> >> > >>>> >> > // test.txt should contain more than one character >>>> >> > template.sendBody("direct:start", >>>> >> > this.getClass().getResourceAsStream("/test.txt")); >>>> >> > >>>> >> > assertMockEndpointsSatisfied(); >>>> >> > } >>>> >> > >>>> >> > @Override >>>> >> > protected RouteBuilder createRouteBuilder() throws Exception { >>>> >> > return new RouteBuilder() { >>>> >> > @Override >>>> >> > public void configure() throws Exception { >>>> >> > StreamCachingStrategy streamCachingStrategy = new >>>> >> > DefaultStreamCachingStrategy(); >>>> >> > streamCachingStrategy.setSpoolThreshold(1); >>>> >> > context.setStreamCachingStrategy(streamCachingStrategy); >>>> >> > context.setStreamCaching(true); >>>> >> > >>>> >> > from("direct:start") >>>> >> > .wireTap("direct:x") >>>> >> > .to("file:y"); >>>> >> > >>>> >> > from("direct:x") >>>> >> > .delay(2000) >>>> >> > .to("file:z"); >>>> >> > } >>>> >> > }; >>>> >> > } >>>> >> > >>>> >> > @Override >>>> >> > public String isMockEndpoints() { >>>> >> > return "(file:z|file:y)"; >>>> >> > } >>>> >> > } >>>> >> > ============= >>>> >> > >>>> >> > If you run the test you can clearly see the temp file deletion >>>> followed >>>> >> by >>>> >> > the closed stream exception: >>>> >> > >>>> >> > Tried 1 to delete file: >>>> >> > >>>> >> >>>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp >>>> >> > with result: true >>>> >> > >>>> >> > Cannot reset stream from file >>>> >> > >>>> >> >>>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp >>>> >> > >>>> >> > I encountered the same issue during a more complex route that does >>>> some >>>> >> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1 >>>> so >>>> >> it >>>> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284 >>>> but I >>>> >> > need to test this. >>>> >> > >>>> >> > Kind regards, >>>> >> > >>>> >> > Geert >>>> >> >>>> >> >> >> >> -- >> Claus Ibsen >> ----------------- >> Red Hat, Inc. >> Email: cib...@redhat.com >> Twitter: davsclaus >> Blog: http://davsclaus.com >> Author of Camel in Action: http://www.manning.com/ibsen >> hawtio: http://hawt.io/ >> fabric8: http://fabric8.io/ -- Claus Ibsen ----------------- Red Hat, Inc. Email: cib...@redhat.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen hawtio: http://hawt.io/ fabric8: http://fabric8.io/