Author: davsclaus Date: Wed Dec 7 10:05:08 2011 New Revision: 1211363 URL: http://svn.apache.org/viewvc?rev=1211363&view=rev Log: CAMEL-4748: Ensure old stream is closed when re-initializing or closing consumer
Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1211363&r1=1211362&r2=1211363&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original) +++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Wed Dec 7 10:05:08 2011 @@ -36,6 +36,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,8 @@ public class StreamConsumer extends Defa private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'"; private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(",")); private ExecutorService executor; - private InputStream inputStream = System.in; + private volatile InputStream inputStream = System.in; + private volatile InputStream inputStreamToClose; private StreamEndpoint endpoint; private String uri; private boolean initialPromptDone; @@ -86,6 +88,9 @@ public class StreamConsumer extends Defa executor = null; } lines.clear(); + + // do not close regular inputStream as it may be System.in etc. + IOHelper.close(inputStreamToClose); super.doStop(); } @@ -98,16 +103,21 @@ public class StreamConsumer extends Defa } private BufferedReader initializeStream() throws Exception { + // close old stream, before obtaining a new stream + IOHelper.close(inputStreamToClose); + if ("in".equals(uri)) { inputStream = System.in; + inputStreamToClose = null; } else if ("file".equals(uri)) { inputStream = resolveStreamFromFile(); + inputStreamToClose = inputStream; } else if ("url".equals(uri)) { inputStream = resolveStreamFromUrl(); + inputStreamToClose = inputStream; } Charset charset = endpoint.getCharset(); - BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, charset)); - return br; + return new BufferedReader(new InputStreamReader(inputStream, charset)); } private void readFromStream() throws Exception { Modified: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=1211363&r1=1211362&r2=1211363&view=diff ============================================================================== --- camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java (original) +++ camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Wed Dec 7 10:05:08 2011 @@ -51,10 +51,13 @@ public class ScanStreamFileTest extends mock.expectedMinimumMessageCount(2); FileOutputStream fos = new FileOutputStream(file); - fos.write("Hello\n".getBytes()); - Thread.sleep(150); - fos.write("World\n".getBytes()); - fos.close(); + try { + fos.write("Hello\n".getBytes()); + Thread.sleep(150); + fos.write("World\n".getBytes()); + } finally { + fos.close(); + } assertMockEndpointsSatisfied(); } @@ -65,17 +68,20 @@ public class ScanStreamFileTest extends mock.expectedMinimumMessageCount(3); FileOutputStream fos = refreshFile(null); - fos.write("Hello\n".getBytes()); - Thread.sleep(150); - fos = refreshFile(fos); - fos.write("there\n".getBytes()); - Thread.sleep(150); - fos = refreshFile(fos); - fos.write("World\n".getBytes()); - Thread.sleep(150); - fos = refreshFile(fos); - fos.write("!\n".getBytes()); - fos.close(); + try { + fos.write("Hello\n".getBytes()); + Thread.sleep(150); + fos = refreshFile(fos); + fos.write("there\n".getBytes()); + Thread.sleep(150); + fos = refreshFile(fos); + fos.write("World\n".getBytes()); + Thread.sleep(150); + fos = refreshFile(fos); + fos.write("!\n".getBytes()); + } finally { + fos.close(); + } assertMockEndpointsSatisfied(); }