Author: cutting
Date: Fri Jan  6 13:35:35 2006
New Revision: 366571

URL: http://svn.apache.org/viewcvs?rev=366571&view=rev
Log:
Fix for NUTCH-151: CommandRunner can hang after the main thread exec
is finished and has inefficient busy loop.

  I encountered a case where the JVM of a Tasktracker child did not exit
after the main thread returned; a thread dump showed only the threads named
STDOUT and STDERR from CommandRunner as non-daemon threads, and both were
doing a read().  CommandRunner also had an excessively costly busy loop.
These problems were fixed by:
1. The pipe io threads should be daemons.
2. The main thread should always interrupt() the pipe io threads when
   finishing up, not just when a timeout occurs.
3. Sleep before testing whether the process has finished with
   Process.exitValue().
4. Increased the sleep time to be 1000msec.


By Paul Baclace.

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/util/CommandRunner.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/util/CommandRunner.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/util/CommandRunner.java?rev=366571&r1=366570&r2=366571&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/util/CommandRunner.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/util/CommandRunner.java Fri 
Jan  6 13:35:35 2006
@@ -18,6 +18,7 @@
  * Adopted by John Xing for Nutch Project from
  * http://blog.fivesight.com/prb/space/Call+an+External+Command+from+Java/,
  * which explains the code in detail.
+ * [Original author is moving his site to http://mult.ifario.us/   -peb]
  *
  * Comments by John Xing on 20040621:
  * (1) EDU.oswego.cs.dl.util.concurrent.* is in j2sdk 1.5 now.
@@ -31,6 +32,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.InterruptedIOException;
 
 import EDU.oswego.cs.dl.util.concurrent.BrokenBarrierException;
 import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;
@@ -80,40 +82,47 @@
   }
 
   public void evaluate() throws IOException {
-    Process proc = Runtime.getRuntime().exec(_command);
+      this.exec();
+  }
 
+  /**
+   *
+   * @return process exit value (return code) or -1 if timed out.
+   * @throws IOException
+   */
+  public int exec() throws IOException {
+    Process proc = Runtime.getRuntime().exec(_command);
     _barrier = new CyclicBarrier(3 + ((_stdin != null) ? 1 : 0));
 
     PullerThread so =
       new PullerThread("STDOUT", proc.getInputStream(), _stdout);
+    so.setDaemon(true);
     so.start();
 
     PullerThread se =
       new PullerThread("STDERR", proc.getErrorStream(), _stderr);
+    se.setDaemon(true);
     se.start();
 
     PusherThread si = null;
     if (_stdin != null) {
       si = new PusherThread("STDIN", _stdin, proc.getOutputStream());
+      si.setDaemon(true);
       si.start();
     }
 
     boolean _timedout = false;
     long end = System.currentTimeMillis() + _timeout * 1000;
 
+    //
     try {
       if (_timeout == 0) {
-        _barrier.barrier();
+        _barrier.barrier(); // JDK 1.5: // _barrier.await();
       } else {
-        _barrier.attemptBarrier(_timeout * 1000);
+        _barrier.attemptBarrier(_timeout * 1000); // JDK 1.5: //  
_barrier.await(_timeout, TimeUnit.SECONDS);
       }
     } catch (TimeoutException ex) {
       _timedout = true;
-      if (si != null) {
-        si.interrupt();
-      }
-      so.interrupt();
-      se.interrupt();
       if (_destroyOnTimeout) {
         proc.destroy();
       }
@@ -123,16 +132,27 @@
       /* IGNORE */
     }
 
+    // tell the io threads we are finished
+    if (si != null) {
+      si.interrupt();
+    }
+    so.interrupt();
+    se.interrupt();
+
     _xit = -1;
 
     if (!_timedout) {
       if (_waitForExit) {
         do {
           try {
+            Thread.sleep(1000);
             _xit = proc.exitValue();
-            Thread.sleep(250);
           } catch (InterruptedException ie) {
-            /* IGNORE */
+              if (Thread.interrupted()) {
+                  break; // stop waiting on an interrupt for this thread
+              } else {
+                  continue;
+              }
           } catch (IllegalThreadStateException iltse) {
             continue;
           }
@@ -152,6 +172,7 @@
         proc.destroy();
       }
     }
+    return _xit;
   }
 
   public Throwable getThrownError() {
@@ -163,8 +184,6 @@
     private OutputStream _os;
     private InputStream _is;
 
-    private volatile boolean _kaput;
-
     private boolean _closeInput;
 
     protected PumperThread(
@@ -179,7 +198,6 @@
     }
 
     public void run() {
-      _kaput = false;
       try {
         byte[] buf = new byte[BUF];
         int read = 0;
@@ -189,9 +207,10 @@
           _os.write(buf, 0, read);
           _os.flush();
         }
+      } catch (InterruptedIOException iioe) {
+        // ignored
       } catch (Throwable t) {
         _thrownError = t;
-        return;
       } finally {
         try {
           if (_closeInput) {
@@ -203,13 +222,6 @@
           /* IGNORE */
         }
       }
-      try {
-        _barrier.barrier();
-      } catch (InterruptedException ie) {
-        /* IGNORE */
-      } catch (BrokenBarrierException bbe) {
-        /* IGNORE */
-      }
     }
   }
 
@@ -254,7 +266,7 @@
     String filePath = null;
     int timeout = 10;
 
-    String usage = "Usage: CommandRunner [-timeout timeout] commandPath 
filePath";
+    String usage = "Usage: CommandRunner [-timeout timeoutSecs] commandPath 
filePath";
 
     if (args.length < 2) {
       System.err.println(usage);


Reply via email to