Hi

Yeah we should likely have a StreamCacheHelper or introduce a
copy(boolean clone) method that takes a boolean (with a better name)
that indicate it does a indpendenent copy. Then we can keep the inner
details how this copy does in those stream cache implementations.

The wire tap already does a copy of the stream cache today. But it
likely need that clone copy. We could also make that the default.
Though I think multicast eip does a copy as well but it may reuse the
same underlying file, and only delete it when last exchange is done
and closes it.



On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <hekon...@gmail.com> wrote:
> Hi,
>
> You can also use Wiretap's onPrepareRef option and use custom processor to
> copy the content of the cached body.
>
> Franz, would you be so kind and create a pull request with your fix?
> Somebody will review it and merge. Thanks in advance!
>
> Cheers!
>
> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
> emc2...@googlemail.com> napisał:
>
>> Hi Geert,
>>
>> it is a bug. You can try as a workaround to set the threshold
>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
>> number; then the body will be kept in memory.
>>
>> Alternatively, you can modify the code of the Camel class
>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
>> method configureCopyExchange in the following way:
>>
>>
>>    private Exchange configureCopyExchange(Exchange exchange) throws
>> IOException {
>>
>>
>>         // must use a copy as we dont want it to cause side effects of
>> the original exchange
>>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
>> false);
>>
>>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
>>             //the file stream must be copied, otherwise you get errors
>> because the stream file is removed when the parent route is finished
>>             FileInputStreamCache streamCache = (FileInputStreamCache)
>> exchange.getIn().getBody();
>>             CachedOutputStream cos = new CachedOutputStream(copy);
>>             try {
>>               IOHelper.copy(streamCache, cos);
>>             } finally {
>>               IOHelper.close(streamCache, cos);
>>               streamCache.reset();
>>             }
>>             copy.getIn().setBody(cos.newStreamCache());
>>         }
>>         // set MEP to InOnly as this wire tap is a fire and forget
>>         copy.setPattern(ExchangePattern.InOnly);
>>         return copy;
>>     }
>>
>> The idea behind this is to make a copy of the stream cache file, so
>> that you get an additional stream cache file for the second route (in
>> your case for the route "direct:x"). This second stream cache file
>> will be deleted when the second route is finished.
>>
>> I also hope that this issue will be fixed. I am no committer so I
>> cannot say when this issue will be solved; I have made contributions
>> which solved a similar problem in the aggregator and splitter.
>>
>> I think you can open a Jira ticket with the above solution suggestion.
>>
>> Regards Franz
>>
>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
>> <geer...@aviovision.com> wrote:
>> > Hi Franz,
>> >
>> > is this something that will be fixed in an upcoming release? Is it a bug
>> or
>> > does it work as designed?
>> > Can we use a workaround to avoid this behaviour, for example by not
>> > deleting the temp files?
>> >
>> >
>> > Kind regards,
>> >
>> > Geert
>> >
>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
>> > emc2...@googlemail.com> wrote:
>> >
>> >> Hello Geert,
>> >>
>> >> there is no solution yet for your problem. Currently the stream cache
>> >> file is removed at the end of the route which created the file. In
>> >> your case the stream cache file is deleted when the "direct:start"
>> >> route is finished. The wire tap runs in a separate thread and
>> >> therefore it can happen that it tries to read the cached file when it
>> >> is already deleted, especially when you have a delay in the wiretap
>> >> route ("direct:x").
>> >>
>> >>
>> >> Regards Franz
>> >>
>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
>> >> <geer...@aviovision.com> wrote:
>> >> > Hi,
>> >> >
>> >> > I noticed a bug where the body (StreamCache) was already removed
>> before
>> >> the
>> >> > exchange reached the end (in the Wiretap route).
>> >> >
>> >> > I found the following ticket
>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
>> >> >
>> >>
>> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
>> >> > but I think it still doesn't fix the Wiretap problem.
>> >> >
>> >> > Here you can find my test (executed on 2.15.1). If you disable the
>> >> > StreamCaching or remove the delay it works, enabling it again will
>> break
>> >> > the test.
>> >> >
>> >> > ============
>> >> > import org.apache.camel.builder.RouteBuilder;
>> >> > import org.apache.camel.component.mock.MockEndpoint;
>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
>> >> > import org.apache.camel.spi.StreamCachingStrategy;
>> >> > import org.apache.camel.test.junit4.CamelTestSupport;
>> >> > import org.junit.Before;
>> >> > import org.junit.Test;
>> >> >
>> >> > public class WireTapTest extends CamelTestSupport {
>> >> >
>> >> > private MockEndpoint y;
>> >> > private MockEndpoint z;
>> >> >
>> >> > @Before
>> >> > public void prepareEndpoints() {
>> >> > y = getMockEndpoint("mock:file:y");
>> >> > z = getMockEndpoint("mock:file:z");
>> >> > }
>> >> >
>> >> > @Test
>> >> > public void
>> >> >
>> >>
>> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
>> >> > throws InterruptedException {
>> >> > y.expectedMessageCount(1);
>> >> > z.expectedMessageCount(1);
>> >> >
>> >> > // test.txt should contain more than one character
>> >> > template.sendBody("direct:start",
>> >> > this.getClass().getResourceAsStream("/test.txt"));
>> >> >
>> >> > assertMockEndpointsSatisfied();
>> >> > }
>> >> >
>> >> > @Override
>> >> > protected RouteBuilder createRouteBuilder() throws Exception {
>> >> > return new RouteBuilder() {
>> >> > @Override
>> >> > public void configure() throws Exception {
>> >> > StreamCachingStrategy streamCachingStrategy = new
>> >> > DefaultStreamCachingStrategy();
>> >> > streamCachingStrategy.setSpoolThreshold(1);
>> >> > context.setStreamCachingStrategy(streamCachingStrategy);
>> >> > context.setStreamCaching(true);
>> >> >
>> >> > from("direct:start")
>> >> > .wireTap("direct:x")
>> >> > .to("file:y");
>> >> >
>> >> > from("direct:x")
>> >> > .delay(2000)
>> >> > .to("file:z");
>> >> > }
>> >> > };
>> >> > }
>> >> >
>> >> > @Override
>> >> > public String isMockEndpoints() {
>> >> > return "(file:z|file:y)";
>> >> > }
>> >> > }
>> >> > =============
>> >> >
>> >> > If you run the test you can clearly see the temp file deletion
>> followed
>> >> by
>> >> > the closed stream exception:
>> >> >
>> >> > Tried 1 to delete file:
>> >> >
>> >>
>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>> >> > with result: true
>> >> >
>> >> > Cannot reset stream from file
>> >> >
>> >>
>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>> >> >
>> >> > I encountered the same issue during a more complex route that does
>> some
>> >> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1
>> so
>> >> it
>> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284
>> but I
>> >> > need to test this.
>> >> >
>> >> > Kind regards,
>> >> >
>> >> > Geert
>> >>
>>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cib...@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Reply via email to