This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new e74039a CAMEL-15557: Stream Cache file not deleted (#4260) e74039a is described below commit e74039aedc5a915d65a6b9b30da5b4ba3d00d305 Author: forsthofer <forstho...@users.noreply.github.com> AuthorDate: Mon Sep 21 06:38:02 2020 +0200 CAMEL-15557: Stream Cache file not deleted (#4260) If you have a route with a Multicast with parallel processing and a timeout and a sub-route in the multicast which is creating an OutputStreamCache before the timeout and is writing to the OutputStreamCache after the timeout then the created file is never deleted from the file system. Co-authored-by: Franz Forsthofer <franz.forstho...@sap.com> --- .../MulticastParallelTimeoutStreamCachingTest.java | 36 +++++++++++++++++++++- .../converter/stream/FileInputStreamCache.java | 11 +++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java index e0e216f..6e39f10 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java @@ -19,6 +19,7 @@ package org.apache.camel.processor; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FilterInputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import org.apache.camel.ContextTestSupport; @@ -26,6 +27,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.converter.stream.CachedOutputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -61,7 +63,7 @@ public class MulticastParallelTimeoutStreamCachingTest extends ContextTestSuppor } @Test - public void testSendingAMessageUsingMulticastConvertsToReReadable() throws Exception { + public void testCreateOutputStreamCacheAfterTimeout() throws Exception { getMockEndpoint("mock:x").expectedBodiesReceived(BODY_STRING); template.sendBody("direct:a", "testMessage"); @@ -73,6 +75,15 @@ public class MulticastParallelTimeoutStreamCachingTest extends ContextTestSuppor File[] files = f.listFiles(); assertEquals(0, files.length); } + + @Test + public void testCreateOutputStreamCacheBeforeTimeoutButWriteToOutputStreamCacheAfterTimeout() throws Exception { + getMockEndpoint("mock:exception").expectedMessageCount(1); + getMockEndpoint("mock:y").expectedMessageCount(0); + + template.sendBody("direct:b", "testMessage"); + assertMockEndpointsSatisfied(); + } @Override protected RouteBuilder createRouteBuilder() throws Exception { @@ -91,6 +102,23 @@ public class MulticastParallelTimeoutStreamCachingTest extends ContextTestSuppor }); } }; + + final Processor processor2 = new Processor() { + public void process(Exchange exchange) throws IOException { + // create first the OutputStreamCache and then sleep + CachedOutputStream outputStream = new CachedOutputStream(exchange); + try { + // sleep for one second so that the write to the CachedOutputStream happens after the main exchange has finished due to timeout on the multicast + Thread.sleep(1000l); + } catch (InterruptedException e) { + throw new IllegalStateException("Unexpected exception", e); + } + outputStream.write(BODY); + Message in = exchange.getIn(); + // use FilterInputStream to trigger streamcaching + in.setBody(outputStream.getInputStream()); + } + }; return new RouteBuilder() { public void configure() { @@ -101,10 +129,16 @@ public class MulticastParallelTimeoutStreamCachingTest extends ContextTestSuppor context.getStreamCachingStrategy().setRemoveSpoolDirectoryWhenStopping(false); context.getStreamCachingStrategy().setSpoolThreshold(1L); context.setStreamCaching(true); + + onException(IOException.class).to("mock:exception"); from("direct:a").multicast().timeout(500L).parallelProcessing().to("direct:x"); from("direct:x").process(processor1).to("mock:x"); + + from("direct:b").multicast().timeout(500l).parallelProcessing().to("direct:y"); + + from("direct:y").process(processor2).to("mock:y"); } }; } diff --git a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java index 6e77c91..3dcae3f 100644 --- a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java +++ b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java @@ -251,6 +251,17 @@ public final class FileInputStreamCache extends InputStream implements StreamCac if (tempFile != null) { throw new IllegalStateException("The method 'createOutputStream' can only be called once!"); } + if (closedOnCompletion && exchangeCounter.get() == 0) { + // exchange was already stopped -> in this case the tempFile would never be deleted. + // This can happen when in the splitter or Multi-cast case with parallel processing, the CachedOutputStream is created when the main unit of work + // is still active, but has a timeout and after the timeout which stops the unit of work the FileOutputStream is created. + // We only can throw here an Exception and inform the user that the processing took longer than the set timeout. + String error = "Cannot create a FileOutputStream for Stream Caching, because this FileOutputStream would never be removed from the file system." + + " This situation can happen with a Splitter or Multi Cast in parallel processing if there is a timeout set on the Splitter or Multi Cast, " + + " and the processing in a sub-branch takes longer than the timeout. Consider to increase the timeout."; + LOG.error(error); + throw new IOException(error); + } tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory()); LOG.trace("Creating temporary stream cache file: {}", tempFile);