Author: andygumbrecht
Date: Thu Jan 10 11:37:21 2013
New Revision: 1431290

URL: http://svn.apache.org/viewvc?rev=1431290&view=rev
Log:
Cleanup and make ServiePool more robust.
Close sockets.

Modified:
    
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
    
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
    
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java
    
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
    
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java

Modified: 
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
--- 
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
 (original)
+++ 
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
 Thu Jan 10 11:37:21 2013
@@ -107,7 +107,7 @@ public class KeepAliveServer implements 
                         backlog--;
 
                         try {
-                            session.socket.close();
+                            session.close();
                         } catch (Throwable e) {
                             //Ignore
                         } finally {
@@ -136,7 +136,7 @@ public class KeepAliveServer implements 
 
             if (l.tryLock()) {
                 try {
-                    session.socket.close();
+                    session.close();
                 } catch (Throwable e) {
                     //Ignore
                 } finally {
@@ -200,20 +200,20 @@ public class KeepAliveServer implements 
 
         // only used inside the Lock
         private final Socket socket;
+        private InputStream in = null;
+        private OutputStream out = null;
 
-        protected Session(final KeepAliveServer kas, final Socket socket) {
+        private Session(final KeepAliveServer kas, final Socket socket) {
             this.kas = kas;
             this.socket = socket;
             this.lastRequest = new AtomicLong(System.currentTimeMillis());
             this.thread = Thread.currentThread();
         }
 
-        protected void service() throws ServiceException, IOException {
+        private void service() throws ServiceException, IOException {
             this.kas.addSession(this);
 
             int i = -1;
-            InputStream in = null;
-            OutputStream out = null;
 
             try {
 
@@ -222,11 +222,11 @@ public class KeepAliveServer implements 
 
                 try {
                     if (!KeepAliveServer.this.gzip) {
-                        in = new 
BufferedInputStream(this.socket.getInputStream());
-                        out = new 
BufferedOutputStream(this.socket.getOutputStream());
+                        in = new BufferedInputStream(socket.getInputStream());
+                        out = new 
BufferedOutputStream(socket.getOutputStream());
                     } else {
-                        in = new GZIPInputStream(new 
BufferedInputStream(this.socket.getInputStream()));
-                        out = new BufferedOutputStream(new 
FlushableGZIPOutputStream(this.socket.getOutputStream()));
+                        in = new GZIPInputStream(new 
BufferedInputStream(socket.getInputStream()));
+                        out = new BufferedOutputStream(new 
FlushableGZIPOutputStream(socket.getOutputStream()));
                     }
                 } finally {
                     l1.unlock();
@@ -280,31 +280,35 @@ public class KeepAliveServer implements 
                 Thread.interrupted();
             } finally {
 
-                if (null != in) {
-                    try {
-                        in.close();
-                    } catch (Throwable e) {
-                        //ignore
-                    }
-                }
+                close();
 
-                if (null != out) {
-                    try {
-                        out.close();
-                    } catch (Throwable e) {
-                        //ignore
-                    }
+                this.kas.removeSession(this);
+            }
+        }
+
+        private void close() {
+            if (null != in) {
+                try {
+                    in.close();
+                } catch (Throwable e) {
+                    //ignore
                 }
+            }
 
-                if (null != this.socket) {
-                    try {
-                        this.socket.close();
-                    } catch (Throwable e) {
-                        //ignore
-                    }
+            if (null != out) {
+                try {
+                    out.close();
+                } catch (Throwable e) {
+                    //ignore
                 }
+            }
 
-                this.kas.removeSession(this);
+            if (null != socket) {
+                try {
+                    socket.close();
+                } catch (Throwable e) {
+                    //ignore
+                }
             }
         }
     }

Modified: 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
--- 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
 (original)
+++ 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
 Thu Jan 10 11:37:21 2013
@@ -35,39 +35,47 @@ public class ServerServiceFilter impleme
     @Managed
     private final ServerService service;
 
-    public ServerServiceFilter(ServerService service) {
+    public ServerServiceFilter(final ServerService service) {
         this.service = service;
     }
 
+    @Override
     public String getIP() {
         return service.getIP();
     }
 
+    @Override
     public String getName() {
         return service.getName();
     }
 
+    @Override
     public int getPort() {
         return service.getPort();
     }
 
-    public void service(InputStream in, OutputStream out) throws 
ServiceException, IOException {
+    @Override
+    public void service(final InputStream in, final OutputStream out) throws 
ServiceException, IOException {
         service.service(in, out);
     }
 
-    public void service(Socket socket) throws ServiceException, IOException {
+    @Override
+    public void service(final Socket socket) throws ServiceException, 
IOException {
         service.service(socket);
     }
 
+    @Override
     public void start() throws ServiceException {
         service.start();
     }
 
+    @Override
     public void stop() throws ServiceException {
         service.stop();
     }
 
-    public void init(Properties props) throws Exception {
+    @Override
+    public void init(final Properties props) throws Exception {
         service.init(props);
     }
 }

Modified: 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
--- 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java
 (original)
+++ 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java
 Thu Jan 10 11:37:21 2013
@@ -37,12 +37,12 @@ public class ServiceLogger extends Serve
     private Logger logger;
     private boolean debug = false;
 
-    public ServiceLogger(ServerService next) {
+    public ServiceLogger(final ServerService next) {
         super(next);
     }
 
     @Override
-    public void init(Properties props) throws Exception {
+    public void init(final Properties props) throws Exception {
 
         this.logger = 
Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("service." + 
getName()), "org.apache.openejb.server.util.resources");
         this.debug = this.logger.isDebugEnabled();
@@ -51,7 +51,7 @@ public class ServiceLogger extends Serve
     }
 
     @Override
-    public void service(InputStream in, OutputStream out) throws 
ServiceException, IOException {
+    public void service(final InputStream in, final OutputStream out) throws 
ServiceException, IOException {
         throw new UnsupportedOperationException("service(in,out)");
     }
 
@@ -63,7 +63,7 @@ public class ServiceLogger extends Serve
             MDBput = MDC.getMethod("put", String.class, String.class);
         } catch (Exception e) { // no need to log it with a higher level
             Logger.getInstance(LogCategory.OPENEJB, 
ServiceLogger.class.getName())
-                    .debug("can't find log4j MDC class");
+                  .debug("can't find log4j MDC class");
         }
     }
 
@@ -78,9 +78,9 @@ public class ServiceLogger extends Serve
     }
 
     @Override
-    public void service(Socket socket) throws ServiceException, IOException {
+    public void service(final Socket socket) throws ServiceException, 
IOException {
 
-        InetAddress client = socket.getInetAddress();
+        final InetAddress client = socket.getInetAddress();
         final String address = client.getHostAddress();
         final String name = this.getName();
 

Modified: 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
--- 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
 (original)
+++ 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
 Thu Jan 10 11:37:21 2013
@@ -28,42 +28,101 @@ import java.io.OutputStream;
 import java.net.Socket;
 import java.util.Properties;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @Managed
 public class ServicePool extends ServerServiceFilter {
+
     private static final Logger log = 
Logger.getInstance(LogCategory.SERVICEPOOL, 
"org.apache.openejb.util.resources");
 
     private final ThreadPoolExecutor threadPool;
     private final AtomicBoolean stop = new AtomicBoolean();
 
     public ServicePool(final ServerService next, final Properties properties) {
-        this(next, new Options(properties).get("threads", 100));
+        //Liberal defaults
+        this(next, new Options(properties).get("threads", 50), new 
Options(properties).get("queue", 50000), new Options(properties).get("block", 
false));
     }
 
-    public ServicePool(final ServerService next, final int threads) {
+    public ServicePool(final ServerService next, int threads, int queue, final 
boolean block) {
         super(next);
 
-        final int keepAliveTime = (1000 * 60 * 5);
+        if (threads < 1) {
+            threads = 1;
+        }
+
+        if (queue < 1) {
+            queue = 1;
+        }
+
+        /**
+         This thread pool starts with 2 core threads and can grow to the limit 
defined by 'threads'.
+         If a pool thread is idle for more than 1 minute it will be discarded, 
unless the core size is reached.
+         It can accept upto the number of processes defined by 'queue'.
+         If the queue is full then an attempt is made to add the process to 
the queue for 10 seconds.
+         Failure to add to the queue in this time will either result in a 
logged rejection, or if 'block'
+         is true then a final attempt is made to run the process in the 
current thread (the service thread).
+         */
 
-        threadPool = new ThreadPoolExecutor(threads, threads, keepAliveTime, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+        threadPool = new ThreadPoolExecutor(2, threads, 1, TimeUnit.MINUTES, 
new LinkedBlockingQueue<Runnable>(queue));
         threadPool.setThreadFactory(new ThreadFactory() {
-            private volatile int id = 0;
+
+            private final AtomicInteger i = new AtomicInteger(0);
 
             @Override
             public Thread newThread(final Runnable arg0) {
-                return new Thread(arg0, getName() + " " + getNextID());
-            }
+                final Thread t = new Thread(arg0, "OpenEJB." + getName() + "." 
+ i.incrementAndGet());
+
+                t.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+                    @Override
+                    public void uncaughtException(final Thread t, final 
Throwable e) {
+                        log.error("Uncaught error in: " + t.getName(), e);
+                    }
+                });
 
-            private int getNextID() {
-                return id++;
+                return t;
             }
 
         });
 
+        threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+            @Override
+            public void rejectedExecution(final Runnable r, final 
ThreadPoolExecutor tpe) {
+
+                if (null == r || null == tpe || tpe.isShutdown() || 
tpe.isTerminated() || tpe.isTerminating()) {
+                    return;
+                }
+
+                if (log.isWarningEnabled()) {
+                    log.warning("ServicePool at capicity for process: " + r);
+                }
+
+                boolean offer = false;
+                try {
+                    offer = tpe.getQueue().offer(r, 10, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    //Ignore
+                }
+
+                if (!offer) {
+                    log.error("ServicePool failed to run asynchronous process: 
" + r);
+
+                    if (block) {
+                        try {
+                            //Last ditch effort to run the process in the 
current thread
+                            r.run();
+                        } catch (Throwable e) {
+                            log.error("ServicePool failed to run synchronous 
process: " + r);
+                        }
+                    }
+                }
+            }
+        });
+
         SystemInstance.get().setComponent(ServicePool.class, this);
     }
 
@@ -77,12 +136,24 @@ public class ServicePool extends ServerS
 
     @Override
     public void service(final Socket socket) throws ServiceException, 
IOException {
-        final Runnable service = new Runnable() {
+
+        final ClassLoader tccl = 
Thread.currentThread().getContextClassLoader();
+        final Runnable ctxCL = new Runnable() {
             @Override
             public void run() {
+
+                ClassLoader cl = null;
+
                 try {
-                    if (stop.get()) return;
+                    cl = Thread.currentThread().getContextClassLoader();
+                    Thread.currentThread().setContextClassLoader(tccl);
+
+                    if (stop.get()) {
+                        return;
+                    }
+
                     ServicePool.super.service(socket);
+
                 } catch (SecurityException e) {
                     final String msg = "ServicePool: Security error: " + 
e.getMessage();
                     if (log.isDebugEnabled()) {
@@ -104,14 +175,12 @@ public class ServicePool extends ServerS
                     } else {
                         log.error(msg + " - Debug for StackTrace");
                     }
+
                 } finally {
+
+                    //Ensure delegated socket is closed here
+
                     try {
-                        // Once the thread is done with the socket, clean it up
-                        // The ServiceDaemon does not close the sockets as it 
is
-                        // single threaded and only accepts sockets and then
-                        // hands them off to be proceeceed.  As the thread 
doing
-                        // that processing it is our job to close the socket
-                        // when we are finished with it.
                         if (socket != null) {
                             socket.close();
                         }
@@ -123,19 +192,7 @@ public class ServicePool extends ServerS
                             log.warning(msg);
                         }
                     }
-                }
-            }
-        };
 
-        final ClassLoader tccl = 
Thread.currentThread().getContextClassLoader();
-        final Runnable ctxCL = new Runnable() {
-            @Override
-            public void run() {
-                final ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
-                Thread.currentThread().setContextClassLoader(tccl);
-                try {
-                    service.run();
-                } finally {
                     Thread.currentThread().setContextClassLoader(cl);
                 }
             }
@@ -149,6 +206,7 @@ public class ServicePool extends ServerS
 
     @Managed(append = true)
     public class Pool {
+
         @Managed
         public boolean isShutdown() {
             return threadPool.isShutdown();
@@ -180,7 +238,7 @@ public class ServicePool extends ServerS
         }
 
         @Managed
-        public long getKeepAliveTime(TimeUnit unit) {
+        public long getKeepAliveTime(final TimeUnit unit) {
             return threadPool.getKeepAliveTime(unit);
         }
 
@@ -210,22 +268,22 @@ public class ServicePool extends ServerS
         }
 
         @Managed
-        public void setMaximumPoolSize(int maximumPoolSize) {
+        public void setMaximumPoolSize(final int maximumPoolSize) {
             threadPool.setMaximumPoolSize(maximumPoolSize);
         }
 
         @Managed
-        public void setCorePoolSize(int corePoolSize) {
+        public void setCorePoolSize(final int corePoolSize) {
             getThreadPool().setCorePoolSize(corePoolSize);
         }
 
         @Managed
-        public void allowCoreThreadTimeOut(boolean value) {
+        public void allowCoreThreadTimeOut(final boolean value) {
             getThreadPool().allowCoreThreadTimeOut(value);
         }
 
         @Managed(description = "Sets time in nanoseconds")
-        public void setKeepAliveTime(long time) {
+        public void setKeepAliveTime(final long time) {
             getThreadPool().setKeepAliveTime(time, TimeUnit.NANOSECONDS);
         }
     }

Modified: 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
--- 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java
 (original)
+++ 
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java
 Thu Jan 10 11:37:21 2013
@@ -24,7 +24,7 @@ import java.net.Socket;
 public class Status {
 
     public static void main(String[] args) {
-//        System.exit(new Start().start()?0:1);
+        //        System.exit(new Start().start()?0:1);
         new Status().status();
     }
 
@@ -43,10 +43,14 @@ public class Status {
     }
 
     private boolean connect(int tries) {
+
+        Socket socket = null;
+        OutputStream out = null;
+
         try {
             final int port = 
SystemInstance.get().getOptions().get("ejbd.port", 4201);
-            Socket socket = new Socket("localhost", port);
-            OutputStream out = socket.getOutputStream();
+            socket = new Socket("localhost", port);
+            out = socket.getOutputStream();
         } catch (Exception e) {
             if (tries < 2) {
                 return false;
@@ -58,6 +62,22 @@ public class Status {
                 }
                 return connect(--tries);
             }
+        } finally {
+            if (null != out) {
+                try {
+                    out.close();
+                } catch (Throwable t) {
+                    //Ignore
+                }
+            }
+
+            if (null != socket) {
+                try {
+                    socket.close();
+                } catch (Throwable t) {
+                    //Ignore
+                }
+            }
         }
 
         return true;


Reply via email to