Author: gnodet
Date: Tue Jul 16 07:39:19 2013
New Revision: 1503588

URL: http://svn.apache.org/r1503588
Log:
[KARAF-2738] Shell:exec doesn't show full output of commands
Also fix some synchronisation issues

Modified:
    
karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/PumpStreamHandler.java
    
karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/StreamPumper.java

Modified: 
karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/PumpStreamHandler.java
URL: 
http://svn.apache.org/viewvc/karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/PumpStreamHandler.java?rev=1503588&r1=1503587&r2=1503588&view=diff
==============================================================================
--- 
karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/PumpStreamHandler.java
 (original)
+++ 
karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/PumpStreamHandler.java
 Tue Jul 16 07:39:19 2013
@@ -155,6 +155,9 @@ public class PumpStreamHandler {
             catch (InterruptedException e) {
                 // ignore
             }
+            try {
+                outputPump.getIn().close();
+            } catch (IOException e) { }
         }
 
         if (errorPump != null) {
@@ -165,10 +168,16 @@ public class PumpStreamHandler {
             catch (InterruptedException e) {
                 // ignore
             }
+            try {
+                errorPump.getIn().close();
+            } catch (IOException e) { }
         }
 
         if (inputPump != null) {
             inputPump.stop();
+            try {
+                inputPump.getOut().close();
+            } catch (IOException e) { }
         }
 
         try {
@@ -235,6 +244,7 @@ public class PumpStreamHandler {
         assert out != null;
 
         StreamPumper pumper = new StreamPumper(in, out, closeWhenExhausted);
+        pumper.setNonBlocking(true);
         pumper.setAutoflush(true);
         return pumper;
     }

Modified: 
karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/StreamPumper.java
URL: 
http://svn.apache.org/viewvc/karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/StreamPumper.java?rev=1503588&r1=1503587&r2=1503588&view=diff
==============================================================================
--- 
karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/StreamPumper.java
 (original)
+++ 
karaf/branches/karaf-2.3.x/util/src/main/java/org/apache/karaf/util/process/StreamPumper.java
 Tue Jul 16 07:39:19 2013
@@ -37,6 +37,8 @@ public class StreamPumper implements Run
 
     private boolean closeWhenExhausted;
 
+    private boolean nonBlocking;
+
     private boolean autoflush;
 
     private Throwable exception;
@@ -45,6 +47,8 @@ public class StreamPumper implements Run
 
     private boolean started;
 
+    private Thread thread;
+
     /**
      * Create a new stream pumper.
      *
@@ -72,6 +76,14 @@ public class StreamPumper implements Run
         this(in, out, false);
     }
 
+    public InputStream getIn() {
+        return in;
+    }
+
+    public OutputStream getOut() {
+        return out;
+    }
+
     /**
      * Set whether data should be flushed through to the output stream.
      *
@@ -82,6 +94,14 @@ public class StreamPumper implements Run
     }
 
     /**
+     * Set whether data should be read in a non blocking way.
+     * @param nonBlocking   If true, data will be read in a non blocking mode
+     */
+    public void setNonBlocking(boolean nonBlocking) {
+        this.nonBlocking = nonBlocking;
+    }
+
+    /**
      * Copies data from the input stream to the output stream.
      *
      * Terminates as soon as the input stream is closed or an error occurs.
@@ -89,28 +109,48 @@ public class StreamPumper implements Run
     public void run() {
         synchronized (this) {
             started = true;
+            finished = false;
+            finish = false;
+            thread = Thread.currentThread();
         }
-        finished = false;
-        finish = false;
 
         final byte[] buf = new byte[bufferSize];
 
         int length;
         try {
-            do {
-                while (in.available() > 0) {
-                    length = in.read(buf);
-                    if (length < 1 ) {
-                        break;
-                    }
-                    out.write(buf, 0, length);
-                    if (autoflush) {
-                        out.flush();
+            while (true) {
+                if (nonBlocking) {
+                    while (in.available() > 0) {
+                        length = in.read(buf);
+                        if (length > 0) {
+                            out.write(buf, 0, length);
+                            if (autoflush) {
+                                out.flush();
+                            }
+                        } else {
+                            break;
+                        }
                     }
+                    Thread.sleep(50); // Pause to avoid tight loop if external 
proc is too slow
+                } else {
+                    do {
+                        length = in.read(buf);
+                        if (length > 0) {
+                            out.write(buf, 0, length);
+                            if (autoflush) {
+                                out.flush();
+                            }
+                        }
+                    } while (length > 0);
                 }
-                out.flush();
-                Thread.sleep(200);  // Pause to avoid tight loop if external 
proc is slow
-            } while (!finish);
+                boolean finish;
+                synchronized (this) {
+                    finish = this.finish;
+                }
+                if (finish) {
+                    break;
+                }
+            }
         }
         catch (Throwable t) {
             synchronized (this) {
@@ -118,14 +158,16 @@ public class StreamPumper implements Run
             }
         }
         finally {
+            try {
+                out.flush();
+            } catch (IOException e) { }
             if (closeWhenExhausted) {
                 try {
                     out.close();
                 } catch (IOException e) { }
             }
-            finished = true;
-
             synchronized (this) {
+                finished = true;
                 notifyAll();
             }
         }
@@ -136,7 +178,7 @@ public class StreamPumper implements Run
      *
      * @return true     If the stream has been exhausted.
      */
-    public boolean isFinished() {
+    public synchronized boolean isFinished() {
         return finished;
     }
 
@@ -192,7 +234,9 @@ public class StreamPumper implements Run
      */
     public synchronized void stop() {
         finish = true;
-
+        if (nonBlocking && thread != null && !finished) {
+            thread.interrupt();
+        }
         notifyAll();
     }
     


Reply via email to