Author: davsclaus Date: Wed Dec 7 10:09:46 2011 New Revision: 1211366 URL: http://svn.apache.org/viewvc?rev=1211366&view=rev Log: CAMEL-4748: Ensure old stream is closed when re-initializing or closing consumer
Modified: camel/branches/camel-2.8.x/ (props changed) camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Propchange: camel/branches/camel-2.8.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Dec 7 10:09:46 2011 @@ -1 +1 @@ -/camel/trunk:1202148,1202167,1202204-1202206,1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210771,1210830 +/camel/trunk:1202148,1202167,1202204-1202206,1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210771,1210830,1211363 Propchange: camel/branches/camel-2.8.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1211366&r1=1211365&r2=1211366&view=diff ============================================================================== --- camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original) +++ camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Wed Dec 7 10:09:46 2011 @@ -36,6 +36,7 @@ import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,8 @@ public class StreamConsumer extends Defa private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'"; private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(",")); private ExecutorService executor; - private InputStream inputStream = System.in; + private volatile InputStream inputStream = System.in; + private volatile InputStream inputStreamToClose; private StreamEndpoint endpoint; private String uri; private boolean initialPromptDone; @@ -86,6 +88,9 @@ public class StreamConsumer extends Defa executor = null; } lines.clear(); + + // do not close regular inputStream as it may be System.in etc. + IOHelper.close(inputStreamToClose); super.doStop(); } @@ -98,16 +103,21 @@ public class StreamConsumer extends Defa } private BufferedReader initializeStream() throws Exception { + // close old stream, before obtaining a new stream + IOHelper.close(inputStreamToClose); + if ("in".equals(uri)) { inputStream = System.in; + inputStreamToClose = null; } else if ("file".equals(uri)) { inputStream = resolveStreamFromFile(); + inputStreamToClose = inputStream; } else if ("url".equals(uri)) { inputStream = resolveStreamFromUrl(); + inputStreamToClose = inputStream; } Charset charset = endpoint.getCharset(); - BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, charset)); - return br; + return new BufferedReader(new InputStreamReader(inputStream, charset)); } private void readFromStream() throws Exception { Modified: camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=1211366&r1=1211365&r2=1211366&view=diff ============================================================================== --- camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java (original) +++ camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Wed Dec 7 10:09:46 2011 @@ -51,10 +51,13 @@ public class ScanStreamFileTest extends mock.expectedMinimumMessageCount(2); FileOutputStream fos = new FileOutputStream(file); - fos.write("Hello\n".getBytes()); - Thread.sleep(150); - fos.write("World\n".getBytes()); - fos.close(); + try { + fos.write("Hello\n".getBytes()); + Thread.sleep(150); + fos.write("World\n".getBytes()); + } finally { + fos.close(); + } assertMockEndpointsSatisfied(); } @@ -65,17 +68,20 @@ public class ScanStreamFileTest extends mock.expectedMinimumMessageCount(3); FileOutputStream fos = refreshFile(null); - fos.write("Hello\n".getBytes()); - Thread.sleep(150); - fos = refreshFile(fos); - fos.write("there\n".getBytes()); - Thread.sleep(150); - fos = refreshFile(fos); - fos.write("World\n".getBytes()); - Thread.sleep(150); - fos = refreshFile(fos); - fos.write("!\n".getBytes()); - fos.close(); + try { + fos.write("Hello\n".getBytes()); + Thread.sleep(150); + fos = refreshFile(fos); + fos.write("there\n".getBytes()); + Thread.sleep(150); + fos = refreshFile(fos); + fos.write("World\n".getBytes()); + Thread.sleep(150); + fos = refreshFile(fos); + fos.write("!\n".getBytes()); + } finally { + fos.close(); + } assertMockEndpointsSatisfied(); }