Ticket created: https://issues.apache.org/jira/browse/CAMEL-8688
Regards, Geert On Thu, Apr 23, 2015 at 1:25 PM, Claus Ibsen <claus.ib...@gmail.com> wrote: > On Wed, Apr 22, 2015 at 10:17 AM, Franz Paul Forsthofer > <emc2...@googlemail.com> wrote: > > Hello Claus and Henryk, > > > > my original proposal to copy the stream cache file is not the optimal > > solution. A better solution would be to have only one stream cache > > file and to delete this file only when all exchanges which need this > > file are done. This does mean we have to register listeners to the > > event onDone of all UnitOfWorks of the relevant exchanges in the > > stream cache file object and only when all listeners have got the > > onDone event, then the file can be deleted. However this will require > > quite some changes > > > > Probably we could also use this solution for the agregator and splitter > case.. > > > > Yeah though that entails the stream cache needs to keep this state and > be tailored to not only 1 exchange but to potentially N+ exchanges. > > But a good idea nevertheless. Especially for big streams. > > Logging a JIRA ticket is welcome. > > > > > > Regards Franz > > > > On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <claus.ib...@gmail.com> > wrote: > >> Hi > >> > >> Yeah we should likely have a StreamCacheHelper or introduce a > >> copy(boolean clone) method that takes a boolean (with a better name) > >> that indicate it does a indpendenent copy. Then we can keep the inner > >> details how this copy does in those stream cache implementations. > >> > >> The wire tap already does a copy of the stream cache today. But it > >> likely need that clone copy. We could also make that the default. > >> Though I think multicast eip does a copy as well but it may reuse the > >> same underlying file, and only delete it when last exchange is done > >> and closes it. > >> > >> > >> > >> On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <hekon...@gmail.com> > wrote: > >>> Hi, > >>> > >>> You can also use Wiretap's onPrepareRef option and use custom > processor to > >>> copy the content of the cached body. > >>> > >>> Franz, would you be so kind and create a pull request with your fix? > >>> Somebody will review it and merge. Thanks in advance! > >>> > >>> Cheers! > >>> > >>> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer < > >>> emc2...@googlemail.com> napisał: > >>> > >>>> Hi Geert, > >>>> > >>>> it is a bug. You can try as a workaround to set the threshold > >>>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge > >>>> number; then the body will be kept in memory. > >>>> > >>>> Alternatively, you can modify the code of the Camel class > >>>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the > >>>> method configureCopyExchange in the following way: > >>>> > >>>> > >>>> private Exchange configureCopyExchange(Exchange exchange) throws > >>>> IOException { > >>>> > >>>> > >>>> // must use a copy as we dont want it to cause side effects of > >>>> the original exchange > >>>> Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, > >>>> false); > >>>> > >>>> if (copy.getIn().getBody() instanceof FileInputStreamCache) { > >>>> //the file stream must be copied, otherwise you get errors > >>>> because the stream file is removed when the parent route is finished > >>>> FileInputStreamCache streamCache = (FileInputStreamCache) > >>>> exchange.getIn().getBody(); > >>>> CachedOutputStream cos = new CachedOutputStream(copy); > >>>> try { > >>>> IOHelper.copy(streamCache, cos); > >>>> } finally { > >>>> IOHelper.close(streamCache, cos); > >>>> streamCache.reset(); > >>>> } > >>>> copy.getIn().setBody(cos.newStreamCache()); > >>>> } > >>>> // set MEP to InOnly as this wire tap is a fire and forget > >>>> copy.setPattern(ExchangePattern.InOnly); > >>>> return copy; > >>>> } > >>>> > >>>> The idea behind this is to make a copy of the stream cache file, so > >>>> that you get an additional stream cache file for the second route (in > >>>> your case for the route "direct:x"). This second stream cache file > >>>> will be deleted when the second route is finished. > >>>> > >>>> I also hope that this issue will be fixed. I am no committer so I > >>>> cannot say when this issue will be solved; I have made contributions > >>>> which solved a similar problem in the aggregator and splitter. > >>>> > >>>> I think you can open a Jira ticket with the above solution suggestion. > >>>> > >>>> Regards Franz > >>>> > >>>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden > >>>> <geer...@aviovision.com> wrote: > >>>> > Hi Franz, > >>>> > > >>>> > is this something that will be fixed in an upcoming release? Is it > a bug > >>>> or > >>>> > does it work as designed? > >>>> > Can we use a workaround to avoid this behaviour, for example by not > >>>> > deleting the temp files? > >>>> > > >>>> > > >>>> > Kind regards, > >>>> > > >>>> > Geert > >>>> > > >>>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer < > >>>> > emc2...@googlemail.com> wrote: > >>>> > > >>>> >> Hello Geert, > >>>> >> > >>>> >> there is no solution yet for your problem. Currently the stream > cache > >>>> >> file is removed at the end of the route which created the file. In > >>>> >> your case the stream cache file is deleted when the "direct:start" > >>>> >> route is finished. The wire tap runs in a separate thread and > >>>> >> therefore it can happen that it tries to read the cached file when > it > >>>> >> is already deleted, especially when you have a delay in the wiretap > >>>> >> route ("direct:x"). > >>>> >> > >>>> >> > >>>> >> Regards Franz > >>>> >> > >>>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden > >>>> >> <geer...@aviovision.com> wrote: > >>>> >> > Hi, > >>>> >> > > >>>> >> > I noticed a bug where the body (StreamCache) was already removed > >>>> before > >>>> >> the > >>>> >> > exchange reached the end (in the Wiretap route). > >>>> >> > > >>>> >> > I found the following ticket > >>>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code > >>>> >> > > >>>> >> > >>>> > https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7 > >>>> >> > but I think it still doesn't fix the Wiretap problem. > >>>> >> > > >>>> >> > Here you can find my test (executed on 2.15.1). If you disable > the > >>>> >> > StreamCaching or remove the delay it works, enabling it again > will > >>>> break > >>>> >> > the test. > >>>> >> > > >>>> >> > ============ > >>>> >> > import org.apache.camel.builder.RouteBuilder; > >>>> >> > import org.apache.camel.component.mock.MockEndpoint; > >>>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy; > >>>> >> > import org.apache.camel.spi.StreamCachingStrategy; > >>>> >> > import org.apache.camel.test.junit4.CamelTestSupport; > >>>> >> > import org.junit.Before; > >>>> >> > import org.junit.Test; > >>>> >> > > >>>> >> > public class WireTapTest extends CamelTestSupport { > >>>> >> > > >>>> >> > private MockEndpoint y; > >>>> >> > private MockEndpoint z; > >>>> >> > > >>>> >> > @Before > >>>> >> > public void prepareEndpoints() { > >>>> >> > y = getMockEndpoint("mock:file:y"); > >>>> >> > z = getMockEndpoint("mock:file:z"); > >>>> >> > } > >>>> >> > > >>>> >> > @Test > >>>> >> > public void > >>>> >> > > >>>> >> > >>>> > testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete() > >>>> >> > throws InterruptedException { > >>>> >> > y.expectedMessageCount(1); > >>>> >> > z.expectedMessageCount(1); > >>>> >> > > >>>> >> > // test.txt should contain more than one character > >>>> >> > template.sendBody("direct:start", > >>>> >> > this.getClass().getResourceAsStream("/test.txt")); > >>>> >> > > >>>> >> > assertMockEndpointsSatisfied(); > >>>> >> > } > >>>> >> > > >>>> >> > @Override > >>>> >> > protected RouteBuilder createRouteBuilder() throws Exception { > >>>> >> > return new RouteBuilder() { > >>>> >> > @Override > >>>> >> > public void configure() throws Exception { > >>>> >> > StreamCachingStrategy streamCachingStrategy = new > >>>> >> > DefaultStreamCachingStrategy(); > >>>> >> > streamCachingStrategy.setSpoolThreshold(1); > >>>> >> > context.setStreamCachingStrategy(streamCachingStrategy); > >>>> >> > context.setStreamCaching(true); > >>>> >> > > >>>> >> > from("direct:start") > >>>> >> > .wireTap("direct:x") > >>>> >> > .to("file:y"); > >>>> >> > > >>>> >> > from("direct:x") > >>>> >> > .delay(2000) > >>>> >> > .to("file:z"); > >>>> >> > } > >>>> >> > }; > >>>> >> > } > >>>> >> > > >>>> >> > @Override > >>>> >> > public String isMockEndpoints() { > >>>> >> > return "(file:z|file:y)"; > >>>> >> > } > >>>> >> > } > >>>> >> > ============= > >>>> >> > > >>>> >> > If you run the test you can clearly see the temp file deletion > >>>> followed > >>>> >> by > >>>> >> > the closed stream exception: > >>>> >> > > >>>> >> > Tried 1 to delete file: > >>>> >> > > >>>> >> > >>>> > /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp > >>>> >> > with result: true > >>>> >> > > >>>> >> > Cannot reset stream from file > >>>> >> > > >>>> >> > >>>> > /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp > >>>> >> > > >>>> >> > I encountered the same issue during a more complex route that > does > >>>> some > >>>> >> > splitting (zip file) and multicasting. This occurred on Camel > 2.14.1 > >>>> so > >>>> >> it > >>>> >> > could be fixed by > https://issues.apache.org/jira/browse/CAMEL-8284 > >>>> but I > >>>> >> > need to test this. > >>>> >> > > >>>> >> > Kind regards, > >>>> >> > > >>>> >> > Geert > >>>> >> > >>>> > >> > >> > >> > >> -- > >> Claus Ibsen > >> ----------------- > >> Red Hat, Inc. > >> Email: cib...@redhat.com > >> Twitter: davsclaus > >> Blog: http://davsclaus.com > >> Author of Camel in Action: http://www.manning.com/ibsen > >> hawtio: http://hawt.io/ > >> fabric8: http://fabric8.io/ > > > > -- > Claus Ibsen > ----------------- > Red Hat, Inc. > Email: cib...@redhat.com > Twitter: davsclaus > Blog: http://davsclaus.com > Author of Camel in Action: http://www.manning.com/ibsen > hawtio: http://hawt.io/ > fabric8: http://fabric8.io/ >