Author: asankha
Date: Fri Nov 27 09:59:57 2009
New Revision: 884809

URL: http://svn.apache.org/viewvc?rev=884809&view=rev
Log:
fix SYNAPSE-600

Modified:
    
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
    
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java

Modified: 
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL: 
http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=884809&r1=884808&r2=884809&view=diff
==============================================================================
--- 
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
 (original)
+++ 
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
 Fri Nov 27 09:59:57 2009
@@ -40,7 +40,6 @@
 import org.apache.http.nio.params.NIOReactorPNames;
 import org.apache.http.nio.reactor.IOEventDispatch;
 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
-import org.apache.http.nio.reactor.ListenerEndpoint;
 import org.apache.http.params.BasicHttpParams;
 import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
@@ -49,6 +48,7 @@
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -242,83 +242,103 @@
         if (log.isDebugEnabled()) {
             log.debug("Starting Listener...");
         }
-        
-        // configure the IO reactor on the specified port
+        // start the Listener in a new seperate thread
+        Thread t = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    startServerEngine();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, "HttpCoreNIOListener");
+
+        t.start();
+        log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener starting 
on" +
+            (bindAddress != null ? " address : " + bindAddress : "") + " port 
: " + port);
+    }
+
+    /**
+     * configure and start the IO reactor on the specified port
+     */
+    private void startServerEngine() {
         HttpParams params = getServerParameters();
         try {
             String prefix = (sslContext == null ? "http" : "https") + 
"-Listener I/O dispatcher";
             ioReactor = new DefaultListeningIOReactor(
-                NHttpConfiguration.getInstance().getServerIOWorkers(),         
       
+                NHttpConfiguration.getInstance().getServerIOWorkers(),
                 new NativeThreadFactory(new ThreadGroup(prefix + " thread 
group"), prefix), params);
 
             ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
                 public boolean handle(IOException ioException) {
-                    log.warn("System may be unstable: IOReactor encountered a 
checked exception : "
-                            + ioException.getMessage(), ioException);
-                    return true;
+                    log.warn("System may be unstable: IOReactor encountered a 
checked exception : " +
+                        ioException.getMessage(), ioException);
+                    if (ioException instanceof BindException) {
+                        // bind failures considered OK to ignore
+                        return true;
+                    }
+                    return false;
                 }
 
                 public boolean handle(RuntimeException runtimeException) {
-                    log.warn("System may be unstable: IOReactor encountered a 
runtime exception : "
-                            + runtimeException.getMessage(), runtimeException);
-                    return true;
+                    log.warn("System may be unstable: IOReactor encountered a 
runtime exception : " +
+                        runtimeException.getMessage(), runtimeException);
+                    if (runtimeException instanceof 
UnsupportedOperationException) {
+                        // Unsupported operations considered OK to ignore
+                        return true;
+                    }
+                    return false;
                 }
             });
         } catch (IOException e) {
-            handleException("Error starting the IOReactor", e);
+            log.error("Error starting the IOReactor", e);
         }
 
-        for (Object obj : 
cfgCtx.getAxisConfiguration().getServices().values()) {
-            addToServiceURIMap((AxisService) obj);
-        }
-        
         handler = new ServerHandler(cfgCtx, params, sslContext != null, 
metrics);
-        final IOEventDispatch ioEventDispatch = getEventDispatch(handler,
-                sslContext, sslIOSessionHandler, params);
+        IOEventDispatch ioEventDispatch = getEventDispatch(
+            handler, sslContext, sslIOSessionHandler, params);
+
         state = BaseConstants.STARTED;
-        
-        ListenerEndpoint endpoint;
+        boolean attemptAutoRestart = true;
         try {
             if (bindAddress == null) {
-                endpoint = ioReactor.listen(new InetSocketAddress(port));
+                ioReactor.listen(new InetSocketAddress(port));
             } else {
-                endpoint = ioReactor.listen(new InetSocketAddress(
+                ioReactor.listen(new InetSocketAddress(
                     InetAddress.getByName(bindAddress), port));
             }
+            ioReactor.execute(ioEventDispatch);
+            attemptAutoRestart = false;
+
+        } catch (InterruptedIOException ex) {
+            log.fatal("Reactor Interrupted");
         } catch (IOException e) {
-            handleException("Encountered an I/O error: " + e.getMessage(), e);
-            return;
+            log.fatal("Encountered an I/O error: " + e.getMessage(), e);
+        } catch (Exception e) {
+            log.fatal("Encountered a Runtime exception : " + e.getMessage(), 
e);
         }
-        
-        // start the IO reactor in a new separate thread
-        Thread t = new Thread(new Runnable() {
-            public void run() {
-                try {
-                    ioReactor.execute(ioEventDispatch);
-                } catch (InterruptedIOException ex) {
-                    log.fatal("Reactor Interrupted");
-                } catch (IOException e) {
-                    log.fatal("Encountered an I/O error: " + e.getMessage(), 
e);
-                } catch (Exception e) {
-                    log.fatal("Unexpected exception in I/O reactor", e);
+
+        if (attemptAutoRestart) {
+            log.info("IOReactor encountered a fatal exception. Attempting a 
re-start of the reactor");
+            try {
+                log.info("Attempt shutdown of the IOReactor if still 
running..");
+                ioReactor.shutdown();
+            } catch (IOException ignore) {}
+            handler.stop();
+
+            log.info("Restarting IOReactor.. ");
+            Thread t = new Thread(new Runnable() {
+                public void run() {
+                    startServerEngine();
                 }
-                log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener 
Shutdown");
-            }
-        }, "HttpCoreNIOListener");
+            }, "HttpCoreNIOListener");
+            t.start();
+            log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener auto 
re-starting");
 
-        t.start();
-        
-        // Wait for the endpoint to become ready, i.e. for the listener to 
start accepting
-        // requests.
-        try {
-            endpoint.waitFor();
-        } catch (InterruptedException e) {
-            log.warn("HttpCoreNIOListener#start() was interrupted");
+        } else {
+            log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener 
Shutdown");
         }
-        
-        log.info((sslContext == null ? "HTTP" : "HTTPS") + " Listener started 
on" +
-            (bindAddress != null ? " address : " + bindAddress : "") + " port 
: " + port);
-    }
+    }    
 
     private void addToServiceURIMap(AxisService service) {
         Parameter param = 
service.getParameter(NhttpConstants.SERVICE_URI_LOCATION);

Modified: 
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL: 
http://svn.apache.org/viewvc/synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=884809&r1=884808&r2=884809&view=diff
==============================================================================
--- 
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
 (original)
+++ 
synapse/branches/1.3/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
 Fri Nov 27 09:59:57 2009
@@ -65,11 +65,13 @@
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.net.BindException;
 import java.util.*;
 
 /**
  * NIO transport sender for Axis2 based on HttpCore and NIO extensions
  */
+...@suppresswarnings({"JavadocReference"})
 public class HttpCoreNIOSender extends AbstractHandler implements 
TransportSender, ManagementSupport {
 
     private static final Log log = LogFactory.getLog(HttpCoreNIOSender.class);
@@ -109,7 +111,7 @@
      * @param transportOut the description of the http/s transport from Axis2 
configuration
      * @throws AxisFault thrown on an error
      */
-    public void init(ConfigurationContext cfgCtx, TransportOutDescription 
transportOut) throws AxisFault {
+    public void init(ConfigurationContext cfgCtx, final 
TransportOutDescription transportOut) throws AxisFault {
         this.cfgCtx = cfgCtx;
 
         // is this an SSL Sender?
@@ -151,44 +153,10 @@
             cfgCtx.setNonReplicableProperty("warnOnHTTP500", warnOnHttp500);
         }
 
-        HttpParams params = getClientParameters();
-        try {
-            String prefix = (sslContext == null ? "http" : "https") + "-Sender 
I/O dispatcher";
-            ioReactor = new DefaultConnectingIOReactor(
-                NHttpConfiguration.getInstance().getClientIOWorkers(),
-                new NativeThreadFactory(new ThreadGroup(prefix + " thread 
group"), prefix), params);
-            ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
-                public boolean handle(IOException ioException) {
-                    log.warn("System may be unstable: IOReactor encountered a 
checked exception : " +
-                        ioException.getMessage(), ioException);
-                    return true;
-                }
-
-                public boolean handle(RuntimeException runtimeException) {
-                    log.warn("System may be unstable: IOReactor encountered a 
runtime exception : " +
-                        runtimeException.getMessage(), runtimeException);
-                    return true;
-                }
-            });
-        } catch (IOException e) {
-            log.error("Error starting the IOReactor", e);
-        }
-
-        handler = new ClientHandler(cfgCtx, params, metrics);
-        final IOEventDispatch ioEventDispatch = getEventDispatch(
-            handler, sslContext, sslIOSessionHandler, params, transportOut);
-
         // start the Sender in a new seperate thread
         Thread t = new Thread(new Runnable() {
             public void run() {
-                try {
-                    ioReactor.execute(ioEventDispatch);
-                } catch (InterruptedIOException ex) {
-                    log.fatal("Reactor Interrupted");
-                } catch (IOException e) {
-                    log.fatal("Encountered an I/O error: " + e.getMessage(), 
e);
-                }
-                log.info((sslContext == null ? "HTTP" : "HTTPS") + " Sender 
Shutdown");
+                executeClientEngine(transportOut);
             }
         }, "HttpCoreNIOSender");
         t.start();
@@ -521,6 +489,86 @@
     }
 
     /**
+     * Configure and start the IOReactor
+     */
+    private void executeClientEngine(final TransportOutDescription 
transportOut) {
+
+        HttpParams params = getClientParameters();
+        try {
+            String prefix = (sslContext == null ? "http" : "https") + "-Sender 
I/O dispatcher";
+            ioReactor = new DefaultConnectingIOReactor(
+                NHttpConfiguration.getInstance().getClientIOWorkers(),
+                new NativeThreadFactory(new ThreadGroup(prefix + " thread 
group"), prefix), params);
+            ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
+                public boolean handle(IOException ioException) {
+                    log.warn("System may be unstable: IOReactor encountered a 
checked exception : " +
+                        ioException.getMessage(), ioException);
+                    if (ioException instanceof BindException) {
+                        // bind failures considered OK to ignore
+                        return true;
+                    }
+                    return false;
+                }
+
+                public boolean handle(RuntimeException runtimeException) {
+                    log.warn("System may be unstable: IOReactor encountered a 
runtime exception : " +
+                        runtimeException.getMessage(), runtimeException);
+                    if (runtimeException instanceof 
UnsupportedOperationException) {
+                        // Unsupported operations considered OK to ignore
+                        return true;
+                    }
+                    return false;
+                }
+            });
+        } catch (IOException e) {
+            log.error("Error starting the IOReactor", e);
+        }
+
+        handler = new ClientHandler(cfgCtx, params, metrics);
+        IOEventDispatch ioEventDispatch = null;
+        try {
+            ioEventDispatch = getEventDispatch(
+            handler, sslContext, sslIOSessionHandler, params, transportOut);
+        } catch (AxisFault axisFault) {
+            log.fatal("Error getting event dispatch");
+        }
+
+        state = BaseConstants.STARTED;
+        boolean attemptAutoRestart = true;
+        try {
+            ioReactor.execute(ioEventDispatch);
+            attemptAutoRestart = false;
+        } catch (InterruptedIOException ex) {
+            log.fatal("Reactor Interrupted");
+        } catch (IOException e) {
+            log.fatal("Encountered an I/O error: " + e.getMessage(), e);
+        } catch (Exception e) {
+            log.fatal("Encountered an error: " + e.getMessage(), e);
+        }
+
+        if (attemptAutoRestart) {
+            log.info("IOReactor encountered a fatal exception. Attempting a 
re-start of the reactor");
+            try {
+                log.info("Attempt shutdown of the IOReactor if still 
running..");
+                ioReactor.shutdown();
+            } catch (IOException ignore) {}
+            handler.stop();
+
+            log.info("Restarting IOReactor.. ");
+            Thread t = new Thread(new Runnable() {
+                public void run() {
+                    executeClientEngine(transportOut);
+                }
+            }, "HttpCoreNIOSender");
+            t.start();
+            log.info((sslContext == null ? "HTTP" : "HTTPS") + " Sender auto 
re-starting");
+
+        } else {
+            log.info((sslContext == null ? "HTTP" : "HTTPS") + " Sender 
Shutdown");
+        }
+    }
+
+    /**
      * Determine the HttpStatusCodedepending on the message type processed <br>
      * (normal response versus fault response) as well as Axis2 message 
context properties set
      * via Synapse configuration or MessageBuilders.


Reply via email to