On Wed, Apr 22, 2015 at 10:17 AM, Franz Paul Forsthofer
<emc2...@googlemail.com> wrote:
> Hello Claus and Henryk,
>
> my original proposal to copy the stream cache file is not the optimal
> solution. A better solution would be to have only one stream cache
> file and to delete this file only when all exchanges which need this
> file are done. This does mean we have to register listeners to the
> event onDone of all UnitOfWorks of the relevant exchanges in the
> stream cache file object and only when all listeners have got the
> onDone event, then the file can be deleted.  However this will require
> quite some changes
>
> Probably we could also use this solution for the agregator and splitter case..
>

Yeah though that entails the stream cache needs to keep this state and
be tailored to not only 1 exchange but to potentially N+ exchanges.

But a good idea nevertheless. Especially for big streams.

Logging a JIRA ticket is welcome.




> Regards Franz
>
> On Wed, Apr 22, 2015 at 9:47 AM, Claus Ibsen <claus.ib...@gmail.com> wrote:
>> Hi
>>
>> Yeah we should likely have a StreamCacheHelper or introduce a
>> copy(boolean clone) method that takes a boolean (with a better name)
>> that indicate it does a indpendenent copy. Then we can keep the inner
>> details how this copy does in those stream cache implementations.
>>
>> The wire tap already does a copy of the stream cache today. But it
>> likely need that clone copy. We could also make that the default.
>> Though I think multicast eip does a copy as well but it may reuse the
>> same underlying file, and only delete it when last exchange is done
>> and closes it.
>>
>>
>>
>> On Wed, Apr 22, 2015 at 8:13 AM, Henryk Konsek <hekon...@gmail.com> wrote:
>>> Hi,
>>>
>>> You can also use Wiretap's onPrepareRef option and use custom processor to
>>> copy the content of the cached body.
>>>
>>> Franz, would you be so kind and create a pull request with your fix?
>>> Somebody will review it and merge. Thanks in advance!
>>>
>>> Cheers!
>>>
>>> wt., 21.04.2015 o 16:25 użytkownik Franz Paul Forsthofer <
>>> emc2...@googlemail.com> napisał:
>>>
>>>> Hi Geert,
>>>>
>>>> it is a bug. You can try as a workaround to set the threshold
>>>> (streamCachingStrategy.setSpoolThreshold(huge_number);) to a huge
>>>> number; then the body will be kept in memory.
>>>>
>>>> Alternatively, you can modify the code of the Camel class
>>>> org.apache.camel.processor.WireTapProcessor. You have to modifiy the
>>>> method configureCopyExchange in the following way:
>>>>
>>>>
>>>>    private Exchange configureCopyExchange(Exchange exchange) throws
>>>> IOException {
>>>>
>>>>
>>>>         // must use a copy as we dont want it to cause side effects of
>>>> the original exchange
>>>>         Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange,
>>>> false);
>>>>
>>>>         if (copy.getIn().getBody() instanceof FileInputStreamCache) {
>>>>             //the file stream must be copied, otherwise you get errors
>>>> because the stream file is removed when the parent route is finished
>>>>             FileInputStreamCache streamCache = (FileInputStreamCache)
>>>> exchange.getIn().getBody();
>>>>             CachedOutputStream cos = new CachedOutputStream(copy);
>>>>             try {
>>>>               IOHelper.copy(streamCache, cos);
>>>>             } finally {
>>>>               IOHelper.close(streamCache, cos);
>>>>               streamCache.reset();
>>>>             }
>>>>             copy.getIn().setBody(cos.newStreamCache());
>>>>         }
>>>>         // set MEP to InOnly as this wire tap is a fire and forget
>>>>         copy.setPattern(ExchangePattern.InOnly);
>>>>         return copy;
>>>>     }
>>>>
>>>> The idea behind this is to make a copy of the stream cache file, so
>>>> that you get an additional stream cache file for the second route (in
>>>> your case for the route "direct:x"). This second stream cache file
>>>> will be deleted when the second route is finished.
>>>>
>>>> I also hope that this issue will be fixed. I am no committer so I
>>>> cannot say when this issue will be solved; I have made contributions
>>>> which solved a similar problem in the aggregator and splitter.
>>>>
>>>> I think you can open a Jira ticket with the above solution suggestion.
>>>>
>>>> Regards Franz
>>>>
>>>> On Tue, Apr 21, 2015 at 11:13 AM, Geert Vanheusden
>>>> <geer...@aviovision.com> wrote:
>>>> > Hi Franz,
>>>> >
>>>> > is this something that will be fixed in an upcoming release? Is it a bug
>>>> or
>>>> > does it work as designed?
>>>> > Can we use a workaround to avoid this behaviour, for example by not
>>>> > deleting the temp files?
>>>> >
>>>> >
>>>> > Kind regards,
>>>> >
>>>> > Geert
>>>> >
>>>> > On Tue, Apr 21, 2015 at 10:37 AM, Franz Paul Forsthofer <
>>>> > emc2...@googlemail.com> wrote:
>>>> >
>>>> >> Hello Geert,
>>>> >>
>>>> >> there is no solution yet for your problem. Currently the stream cache
>>>> >> file is removed at the end of the route which created the file. In
>>>> >> your case the stream cache file is deleted when the "direct:start"
>>>> >> route is finished. The wire tap runs in a separate thread and
>>>> >> therefore it can happen that it tries to read the cached file when it
>>>> >> is already deleted, especially when you have a delay in the wiretap
>>>> >> route ("direct:x").
>>>> >>
>>>> >>
>>>> >> Regards Franz
>>>> >>
>>>> >> On Fri, Apr 17, 2015 at 6:05 PM, Geert Vanheusden
>>>> >> <geer...@aviovision.com> wrote:
>>>> >> > Hi,
>>>> >> >
>>>> >> > I noticed a bug where the body (StreamCache) was already removed
>>>> before
>>>> >> the
>>>> >> > exchange reached the end (in the Wiretap route).
>>>> >> >
>>>> >> > I found the following ticket
>>>> >> > https://issues.apache.org/jira/browse/CAMEL-8386 and code
>>>> >> >
>>>> >>
>>>> https://fisheye6.atlassian.com/changelog/camel-git?cs=4661cbb94513d6047e58581b23dcd4a6fad166f7
>>>> >> > but I think it still doesn't fix the Wiretap problem.
>>>> >> >
>>>> >> > Here you can find my test (executed on 2.15.1). If you disable the
>>>> >> > StreamCaching or remove the delay it works, enabling it again will
>>>> break
>>>> >> > the test.
>>>> >> >
>>>> >> > ============
>>>> >> > import org.apache.camel.builder.RouteBuilder;
>>>> >> > import org.apache.camel.component.mock.MockEndpoint;
>>>> >> > import org.apache.camel.impl.DefaultStreamCachingStrategy;
>>>> >> > import org.apache.camel.spi.StreamCachingStrategy;
>>>> >> > import org.apache.camel.test.junit4.CamelTestSupport;
>>>> >> > import org.junit.Before;
>>>> >> > import org.junit.Test;
>>>> >> >
>>>> >> > public class WireTapTest extends CamelTestSupport {
>>>> >> >
>>>> >> > private MockEndpoint y;
>>>> >> > private MockEndpoint z;
>>>> >> >
>>>> >> > @Before
>>>> >> > public void prepareEndpoints() {
>>>> >> > y = getMockEndpoint("mock:file:y");
>>>> >> > z = getMockEndpoint("mock:file:z");
>>>> >> > }
>>>> >> >
>>>> >> > @Test
>>>> >> > public void
>>>> >> >
>>>> >>
>>>> testSendingAMessageUsingWiretapShouldNotDeleteStreamBeforeWiretappedExcangeIsComplete()
>>>> >> > throws InterruptedException {
>>>> >> > y.expectedMessageCount(1);
>>>> >> > z.expectedMessageCount(1);
>>>> >> >
>>>> >> > // test.txt should contain more than one character
>>>> >> > template.sendBody("direct:start",
>>>> >> > this.getClass().getResourceAsStream("/test.txt"));
>>>> >> >
>>>> >> > assertMockEndpointsSatisfied();
>>>> >> > }
>>>> >> >
>>>> >> > @Override
>>>> >> > protected RouteBuilder createRouteBuilder() throws Exception {
>>>> >> > return new RouteBuilder() {
>>>> >> > @Override
>>>> >> > public void configure() throws Exception {
>>>> >> > StreamCachingStrategy streamCachingStrategy = new
>>>> >> > DefaultStreamCachingStrategy();
>>>> >> > streamCachingStrategy.setSpoolThreshold(1);
>>>> >> > context.setStreamCachingStrategy(streamCachingStrategy);
>>>> >> > context.setStreamCaching(true);
>>>> >> >
>>>> >> > from("direct:start")
>>>> >> > .wireTap("direct:x")
>>>> >> > .to("file:y");
>>>> >> >
>>>> >> > from("direct:x")
>>>> >> > .delay(2000)
>>>> >> > .to("file:z");
>>>> >> > }
>>>> >> > };
>>>> >> > }
>>>> >> >
>>>> >> > @Override
>>>> >> > public String isMockEndpoints() {
>>>> >> > return "(file:z|file:y)";
>>>> >> > }
>>>> >> > }
>>>> >> > =============
>>>> >> >
>>>> >> > If you run the test you can clearly see the temp file deletion
>>>> followed
>>>> >> by
>>>> >> > the closed stream exception:
>>>> >> >
>>>> >> > Tried 1 to delete file:
>>>> >> >
>>>> >>
>>>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>>>> >> > with result: true
>>>> >> >
>>>> >> > Cannot reset stream from file
>>>> >> >
>>>> >>
>>>> /var/folders/db/brq60fqj4vb8mnx_5nlz36nw0000gn/T/camel/camel-tmp-00cd1ce2-7d44-47fe-b357-008e8146f770/cos8797132745923044996.tmp
>>>> >> >
>>>> >> > I encountered the same issue during a more complex route that does
>>>> some
>>>> >> > splitting (zip file) and multicasting. This occurred on Camel 2.14.1
>>>> so
>>>> >> it
>>>> >> > could be fixed by https://issues.apache.org/jira/browse/CAMEL-8284
>>>> but I
>>>> >> > need to test this.
>>>> >> >
>>>> >> > Kind regards,
>>>> >> >
>>>> >> > Geert
>>>> >>
>>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cib...@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen
>> hawtio: http://hawt.io/
>> fabric8: http://fabric8.io/



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cib...@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Reply via email to