Author: davsclaus
Date: Wed Dec  7 10:09:46 2011
New Revision: 1211366

URL: http://svn.apache.org/viewvc?rev=1211366&view=rev
Log:
CAMEL-4748: Ensure old stream is closed when re-initializing or closing consumer

Modified:
    camel/branches/camel-2.8.x/   (props changed)
    
camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
    
camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec  7 10:09:46 2011
@@ -1 +1 @@
-/camel/trunk:1202148,1202167,1202204-1202206,1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210771,1210830
+/camel/trunk:1202148,1202167,1202204-1202206,1202215,1202223,1202659,1202685,1203879,1203978,1204338,1205124,1205372,1205412,1205429,1205431,1205713,1206116,1206414,1207743,1207784,1208301,1208930,1208964-1208965,1209006-1209007,1209382,1209401,1209477,1209845-1209846,1210113,1210771,1210830,1211363

Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1211366&r1=1211365&r2=1211366&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 (original)
+++ 
camel/branches/camel-2.8.x/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 Wed Dec  7 10:09:46 2011
@@ -36,6 +36,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +51,8 @@ public class StreamConsumer extends Defa
     private static final String INVALID_URI = "Invalid uri, valid form: 
'stream:{" + TYPES + "}'";
     private static final List<String> TYPES_LIST = 
Arrays.asList(TYPES.split(","));
     private ExecutorService executor;
-    private InputStream inputStream = System.in;
+    private volatile InputStream inputStream = System.in;
+    private volatile InputStream inputStreamToClose;
     private StreamEndpoint endpoint;
     private String uri;
     private boolean initialPromptDone;
@@ -86,6 +88,9 @@ public class StreamConsumer extends Defa
             executor = null;
         }
         lines.clear();
+
+        // do not close regular inputStream as it may be System.in etc.
+        IOHelper.close(inputStreamToClose);
         super.doStop();
     }
 
@@ -98,16 +103,21 @@ public class StreamConsumer extends Defa
     }
 
     private BufferedReader initializeStream() throws Exception {
+        // close old stream, before obtaining a new stream
+        IOHelper.close(inputStreamToClose);
+
         if ("in".equals(uri)) {
             inputStream = System.in;
+            inputStreamToClose = null;
         } else if ("file".equals(uri)) {
             inputStream = resolveStreamFromFile();
+            inputStreamToClose = inputStream;
         } else if ("url".equals(uri)) {
             inputStream = resolveStreamFromUrl();
+            inputStreamToClose = inputStream;
         }
         Charset charset = endpoint.getCharset();
-        BufferedReader br = new BufferedReader(new 
InputStreamReader(inputStream, charset));
-        return br;
+        return new BufferedReader(new InputStreamReader(inputStream, charset));
     }
 
     private void readFromStream() throws Exception {

Modified: 
camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=1211366&r1=1211365&r2=1211366&view=diff
==============================================================================
--- 
camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
 (original)
+++ 
camel/branches/camel-2.8.x/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
 Wed Dec  7 10:09:46 2011
@@ -51,10 +51,13 @@ public class ScanStreamFileTest extends 
         mock.expectedMinimumMessageCount(2);
 
         FileOutputStream fos = new FileOutputStream(file);
-        fos.write("Hello\n".getBytes());
-        Thread.sleep(150);
-        fos.write("World\n".getBytes());
-        fos.close();
+        try {
+            fos.write("Hello\n".getBytes());
+            Thread.sleep(150);
+            fos.write("World\n".getBytes());
+        } finally {
+            fos.close();
+        }
         
         assertMockEndpointsSatisfied();
     }
@@ -65,17 +68,20 @@ public class ScanStreamFileTest extends 
         mock.expectedMinimumMessageCount(3);
 
         FileOutputStream fos = refreshFile(null);
-        fos.write("Hello\n".getBytes());        
-        Thread.sleep(150);
-        fos = refreshFile(fos);
-        fos.write("there\n".getBytes());
-        Thread.sleep(150);
-        fos = refreshFile(fos);
-        fos.write("World\n".getBytes());
-        Thread.sleep(150);
-        fos = refreshFile(fos);
-        fos.write("!\n".getBytes());
-        fos.close();
+        try {
+            fos.write("Hello\n".getBytes());
+            Thread.sleep(150);
+            fos = refreshFile(fos);
+            fos.write("there\n".getBytes());
+            Thread.sleep(150);
+            fos = refreshFile(fos);
+            fos.write("World\n".getBytes());
+            Thread.sleep(150);
+            fos = refreshFile(fos);
+            fos.write("!\n".getBytes());
+        } finally {
+            fos.close();
+        }
 
         assertMockEndpointsSatisfied();
     }


Reply via email to