Hi Franz, I would certainly prefer this approach. In our case we are handling large files so doing a copy means unnecessary delays and storage usage.
Regards, Geert On Wed, Apr 22, 2015 at 10:17 AM, Franz Paul Forsthofer < emc2...@googlemail.com> wrote: > Hello Claus and Henryk, > > my original proposal to copy the stream cache file is not the optimal > solution. A better solution would be to have only one stream cache > file and to delete this file only when all exchanges which need this > file are done. This does mean we have to register listeners to the > event onDone of all UnitOfWorks of the relevant exchanges in the > stream cache file object and only when all listeners have got the > onDone event, then the file can be deleted. However this will require > quite some changes > > Probably we could also use this solution for the agregator and splitter > case.. > > Regards Franz > > On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <claus.ib...@gmail.com> > wrote: > > Hi > > > > Yeah we should likely have a StreamCacheHelper or introduce a > > copy(boolean clone) method that takes a boolean (with a better name) > > that indicate it does a indpendenent copy. Then we can keep the inner > > details how this copy does in those stream cache implementations. > > > > The wire tap already does a copy of the stream cache today. But it > > likely need that clone copy. We could also make that the default. > > Though I think multicast eip does a copy as well but it may reuse the > > same underlying file, and only delete it when last exchange is done > > and closes it. > > > > > > > > On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <hekon...@gmail.com> > wrote: > >> Hi, > >> > >> You can also use Wiretap's onPrepareRef option and use custom processor > to > >> copy the content of the cached body. > >> > >> Franz, would you be so kind and create a pull request with your fix? > >> Somebody will review it and merge. Thanks in advance! > >> > >> Cheers! > >> > >> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer < > >> emc2...@googlemail.com> napisał: > >> > >>> Hi Geert, > >>> > >>> it is a bug. You can try as a workaround to set the threshold > >>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge > >>> number; then the body will be kept in memory. > >>> > >>> Alternatively, you can modify the code of the Camel class > >>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the > >>> method configureCopyExchange in the following way: > >>> > >>> > >>> private Exchange configureCopyExchange(Exchange exchange) throws > >>> IOException { > >>> > >>> > >>> // must use a copy as we dont want it to cause side effects of > >>> the original exchange > >>> Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, > >>> false); > >>> > >>> if (copy.getIn().getBody() instanceof FileInputStreamCache) { > >>> //the file stream must be copied, otherwise you get errors > >>> because the stream file is removed when the parent route is finished > >>> FileInputStreamCache streamCache = (FileInputStreamCache) > >>> exchange.getIn().getBody(); > >>> CachedOutputStream cos = new CachedOutputStream(copy); > >>> try { > >>> IOHelper.copy(streamCache, cos); > >>> } finally { > >>> IOHelper.close(streamCache, cos); > >>> streamCache.reset(); > >>> } > >>> copy.getIn().setBody(cos.newStreamCache()); > >>> } > >>> // set MEP to InOnly as this wire tap is a fire and forget > >>> copy.setPattern(ExchangePattern.InOnly); > >>> return copy; > >>> } > >>> > >>> The idea behind this is to make a copy of the stream cache file, so > >>> that you get an additional stream cache file for the second route (in > >>> your case for the route "direct:x"). This second stream cache file > >>> will be deleted when the second route is finished. > >>> > >>> I also hope that this issue will be fixed. I am no committer so I > >>> cannot say when this issue will be solved; I have made contributions > >>> which solved a similar problem in the aggregator and splitter. > >>> > >>> I think you can open a Jira ticket with the above solution suggestion. > >>> > >>> Regards Franz > >>> > >>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden > >>> <geer...@aviovision.com> wrote: > >>> > Hi Franz, > >>> > > >>> > is this something that will be fixed in an upcoming release? Is it a > bug > >>> or > >>> > does it work as designed? > >>> > Can we use a workaround to avoid this behaviour, for example by not > >>> > deleting the temp files? > >>> > > >>> > > >>> > Kind regards, > >>> > > >>> > Geert > >>> > > >>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer < > >>> > emc2...@googlemail.com> wrote: > >>> > > >>> >> Hello Geert, > >>> >> > >>> >> there is no solution yet for your problem. Currently the stream > cache > >>> >> file is removed at the end of the route which created the file. In > >>> >> your case the stream cache file is deleted when the "direct:start" > >>> >> route is finished. The wire tap runs in a separate thread and > >>> >> therefore it can happen that it tries to read the cached file when > it > >>> >> is already deleted, especially when you have a delay in the wiretap > >>> >> route ("direct:x"). > >>> >> > >>> >> > >>> >> Regards Franz > >>> >> > >>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden > >>> >> <geer...@aviovision.com> wrote: > >>> >> > Hi, > >>> >> > > >>> >> > I noticed a bug where the body (StreamCache) was already removed > >>> before > >>> >> the > >>> >> > exchange reached the end (in the Wiretap route). > >>> >> > > >>> >> > I found the following ticket > >>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code > >>> >> > > >>> >> > >>> > https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7 > >>> >> > but I think it still doesn't fix the Wiretap problem. > >>> >> > > >>> >> > Here you can find my test (executed on 2.15.1). If you disable the > >>> >> > StreamCaching or remove the delay it works, enabling it again will > >>> break > >>> >> > the test. > >>> >> > > >>> >> > ============ > >>> >> > import org.apache.camel.builder.RouteBuilder; > >>> >> > import org.apache.camel.component.mock.MockEndpoint; > >>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy; > >>> >> > import org.apache.camel.spi.StreamCachingStrategy; > >>> >> > import org.apache.camel.test.junit4.CamelTestSupport; > >>> >> > import org.junit.Before; > >>> >> > import org.junit.Test; > >>> >> > > >>> >> > public class WireTapTest extends CamelTestSupport { > >>> >> > > >>> >> > private MockEndpoint y; > >>> >> > private MockEndpoint z; > >>> >> > > >>> >> > @Before > >>> >> > public void prepareEndpoints() { > >>> >> > y = getMockEndpoint("mock:file:y"); > >>> >> > z = getMockEndpoint("mock:file:z"); > >>> >> > } > >>> >> > > >>> >> > @Test > >>> >> > public void > >>> >> > > >>> >> > >>> > testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete() > >>> >> > throws InterruptedException { > >>> >> > y.expectedMessageCount(1); > >>> >> > z.expectedMessageCount(1); > >>> >> > > >>> >> > // test.txt should contain more than one character > >>> >> > template.sendBody("direct:start", > >>> >> > this.getClass().getResourceAsStream("/test.txt")); > >>> >> > > >>> >> > assertMockEndpointsSatisfied(); > >>> >> > } > >>> >> > > >>> >> > @Override > >>> >> > protected RouteBuilder createRouteBuilder() throws Exception { > >>> >> > return new RouteBuilder() { > >>> >> > @Override > >>> >> > public void configure() throws Exception { > >>> >> > StreamCachingStrategy streamCachingStrategy = new > >>> >> > DefaultStreamCachingStrategy(); > >>> >> > streamCachingStrategy.setSpoolThreshold(1); > >>> >> > context.setStreamCachingStrategy(streamCachingStrategy); > >>> >> > context.setStreamCaching(true); > >>> >> > > >>> >> > from("direct:start") > >>> >> > .wireTap("direct:x") > >>> >> > .to("file:y"); > >>> >> > > >>> >> > from("direct:x") > >>> >> > .delay(2000) > >>> >> > .to("file:z"); > >>> >> > } > >>> >> > }; > >>> >> > } > >>> >> > > >>> >> > @Override > >>> >> > public String isMockEndpoints() { > >>> >> > return "(file:z|file:y)"; > >>> >> > } > >>> >> > } > >>> >> > ============= > >>> >> > > >>> >> > If you run the test you can clearly see the temp file deletion > >>> followed > >>> >> by > >>> >> > the closed stream exception: > >>> >> > > >>> >> > Tried 1 to delete file: > >>> >> > > >>> >> > >>> > /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp > >>> >> > with result: true > >>> >> > > >>> >> > Cannot reset stream from file > >>> >> > > >>> >> > >>> > /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp > >>> >> > > >>> >> > I encountered the same issue during a more complex route that does > >>> some > >>> >> > splitting (zip file) and multicasting. This occurred on Camel > 2.14.1 > >>> so > >>> >> it > >>> >> > could be fixed by > https://issues.apache.org/jira/browse/CAMEL-8284 > >>> but I > >>> >> > need to test this. > >>> >> > > >>> >> > Kind regards, > >>> >> > > >>> >> > Geert > >>> >> > >>> > > > > > > > > -- > > Claus Ibsen > > ----------------- > > Red Hat, Inc. > > Email: cib...@redhat.com > > Twitter: davsclaus > > Blog: http://davsclaus.com > > Author of Camel in Action: http://www.manning.com/ibsen > > hawtio: http://hawt.io/ > > fabric8: http://fabric8.io/ >