Author: ningjiang
Date: Fri Jun 4 09:19:55 2010
New Revision: 951337
URL: http://svn.apache.org/viewvc?rev=951337&view=rev
Log:
CAMEL-2496,CAMEL-2776 delete the temp file only after route is completed
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=951337&r1=951336&r2=951337&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
Fri Jun 4 09:19:55 2010
@@ -30,6 +30,7 @@ import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
+import org.apache.camel.impl.SynchronizationAdapter;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.IOHelper;
import org.apache.commons.logging.Log;
@@ -41,7 +42,8 @@ import org.apache.commons.logging.LogFac
* can configure it by setting the TEMP_DIR property. If you don't set the
TEMP_DIR property,
* it will choose the directory which is set by the system property of
"java.io.tmpdir".
* You can get a cached input stream of this stream. The temp file which is
created with this
- * output stream will be deleted when you close this output stream or the
cached inputStream.
+ * output stream will be deleted when you close this output stream or the all
cached
+ * fileInputStream is closed after the exchange is completed.
*/
public class CachedOutputStream extends OutputStream {
public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
@@ -52,6 +54,7 @@ public class CachedOutputStream extends
private boolean inMemory = true;
private int totalLength;
private File tempFile;
+ private boolean exchangeOnCompleted;
private List<FileInputStreamCache> fileInputStreamCaches = new
ArrayList<FileInputStreamCache>(4);
@@ -68,6 +71,27 @@ public class CachedOutputStream extends
this.outputDir =
exchange.getContext().getTypeConverter().convertTo(File.class, dir);
}
+ // add on completion so we can cleanup after the exchange is done such
as deleting temporary files
+ exchange.addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ try {
+ //set the flag so we can delete the temp file
+ exchangeOnCompleted = true;
+ if (fileInputStreamCaches.size() == 0) {
+ // there is no open fileInputStream let's close it
+ close();
+ }
+ } catch (Exception e) {
+ LOG.warn("Error deleting temporary cache file: " +
tempFile, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "OnCompletion[CachedOutputStream]";
+ }
+ });
}
public void flush() throws IOException {
@@ -76,20 +100,7 @@ public class CachedOutputStream extends
public void close() throws IOException {
currentStream.close();
- try {
- // cleanup temporary file
- if (tempFile != null) {
- boolean deleted = tempFile.delete();
- if (!deleted) {
- LOG.warn("Cannot delete temporary cache file: " +
tempFile);
- } else if (LOG.isTraceEnabled()) {
- LOG.trace("Deleted temporary cache file: " + tempFile);
- }
- tempFile = null;
- }
- } catch (Exception e) {
- LOG.warn("Error deleting temporary cache file: " + tempFile, e);
- }
+ cleanUpTempFile();
}
public boolean equals(Object obj) {
@@ -168,6 +179,27 @@ public class CachedOutputStream extends
}
}
}
+
+ public void releaseFileInputStream(FileInputStreamCache
fileInputStreamCache) throws IOException {
+ fileInputStreamCaches.remove(fileInputStreamCache);
+ if (exchangeOnCompleted && fileInputStreamCaches.size() == 0) {
+ // now we can close stream and delete the temp file
+ close();
+ }
+ }
+
+ private void cleanUpTempFile() {
+ // cleanup temporary file
+ if (tempFile != null) {
+ boolean deleted = tempFile.delete();
+ if (!deleted) {
+ LOG.warn("Cannot delete temporary cache file: " + tempFile);
+ } else if (LOG.isTraceEnabled()) {
+ LOG.trace("Deleted temporary cache file: " + tempFile);
+ }
+ tempFile = null;
+ }
+ }
private void pageToFileStream() throws IOException {
flush();
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=951337&r1=951336&r2=951337&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
Fri Jun 4 09:19:55 2010
@@ -44,9 +44,9 @@ public class FileInputStreamCache extend
if (isSteamOpened()) {
getInputStream().close();
}
- // when close the FileInputStreamCache we should also close the
cachedOutputStream
+ // Just remove the itself from cachedOutputStream
if (cachedOutputStream != null) {
- cachedOutputStream.close();
+ cachedOutputStream.releaseFileInputStream(this);
}
} catch (Exception e) {
throw new RuntimeCamelException(e);
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java?rev=951337&r1=951336&r2=951337&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
Fri Jun 4 09:19:55 2010
@@ -76,7 +76,9 @@ public class CachedOutputStreamTest exte
String temp = toString((InputStream)cache);
((InputStream)cache).close();
+ assertEquals("we should have a temp file", files.length, 1);
assertEquals("Cached a wrong file", temp, TEST_STRING);
+ exchange.getUnitOfWork().done(exchange);
try {
cache.reset();
@@ -91,7 +93,7 @@ public class CachedOutputStreamTest exte
assertEquals("we should have no temp file", files.length, 0);
}
- public void testCacheStreamToFileAndNotCloseStream() throws IOException {
+ public void testCacheStreamToFileCloseStreamBeforeDone() throws
IOException {
CachedOutputStream cos = new CachedOutputStream(exchange);
cos.write(TEST_STRING.getBytes("UTF-8"));
@@ -106,8 +108,9 @@ public class CachedOutputStreamTest exte
assertEquals("Cached a wrong file", temp, TEST_STRING);
cache.reset();
temp = toString((InputStream)cache);
- assertEquals("Cached a wrong file", temp, TEST_STRING);
-
+ assertEquals("Cached a wrong file", temp, TEST_STRING);
+ exchange.getUnitOfWork().done(exchange);
+ assertEquals("we should have a temp file", files.length, 1);
((InputStream)cache).close();
files = file.list();
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java?rev=951337&r1=951336&r2=951337&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterStreamCacheTest.java
Fri Jun 4 09:19:55 2010
@@ -43,13 +43,8 @@ public class SplitterStreamCacheTest ext
private static final String TEST_FILE =
"org/apache/camel/converter/stream/test.xml";
protected int numMessages = 1000;
-
- public void testDummy() {
- // noop
- }
-
- // TODO: Disabled due it fails
- public void xxxTestSendStreamSource() throws Exception {
+
+ public void testSendStreamSource() throws Exception {
MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
resultEndpoint.expectedMessageCount(numMessages);