Author: andygumbrecht
Date: Thu Jan 10 11:37:21 2013
New Revision: 1431290
URL: http://svn.apache.org/viewvc?rev=1431290&view=rev
Log:
Cleanup and make ServiePool more robust.
Close sockets.
Modified:
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java
Modified:
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
(original)
+++
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Thu Jan 10 11:37:21 2013
@@ -107,7 +107,7 @@ public class KeepAliveServer implements
backlog--;
try {
- session.socket.close();
+ session.close();
} catch (Throwable e) {
//Ignore
} finally {
@@ -136,7 +136,7 @@ public class KeepAliveServer implements
if (l.tryLock()) {
try {
- session.socket.close();
+ session.close();
} catch (Throwable e) {
//Ignore
} finally {
@@ -200,20 +200,20 @@ public class KeepAliveServer implements
// only used inside the Lock
private final Socket socket;
+ private InputStream in = null;
+ private OutputStream out = null;
- protected Session(final KeepAliveServer kas, final Socket socket) {
+ private Session(final KeepAliveServer kas, final Socket socket) {
this.kas = kas;
this.socket = socket;
this.lastRequest = new AtomicLong(System.currentTimeMillis());
this.thread = Thread.currentThread();
}
- protected void service() throws ServiceException, IOException {
+ private void service() throws ServiceException, IOException {
this.kas.addSession(this);
int i = -1;
- InputStream in = null;
- OutputStream out = null;
try {
@@ -222,11 +222,11 @@ public class KeepAliveServer implements
try {
if (!KeepAliveServer.this.gzip) {
- in = new
BufferedInputStream(this.socket.getInputStream());
- out = new
BufferedOutputStream(this.socket.getOutputStream());
+ in = new BufferedInputStream(socket.getInputStream());
+ out = new
BufferedOutputStream(socket.getOutputStream());
} else {
- in = new GZIPInputStream(new
BufferedInputStream(this.socket.getInputStream()));
- out = new BufferedOutputStream(new
FlushableGZIPOutputStream(this.socket.getOutputStream()));
+ in = new GZIPInputStream(new
BufferedInputStream(socket.getInputStream()));
+ out = new BufferedOutputStream(new
FlushableGZIPOutputStream(socket.getOutputStream()));
}
} finally {
l1.unlock();
@@ -280,31 +280,35 @@ public class KeepAliveServer implements
Thread.interrupted();
} finally {
- if (null != in) {
- try {
- in.close();
- } catch (Throwable e) {
- //ignore
- }
- }
+ close();
- if (null != out) {
- try {
- out.close();
- } catch (Throwable e) {
- //ignore
- }
+ this.kas.removeSession(this);
+ }
+ }
+
+ private void close() {
+ if (null != in) {
+ try {
+ in.close();
+ } catch (Throwable e) {
+ //ignore
}
+ }
- if (null != this.socket) {
- try {
- this.socket.close();
- } catch (Throwable e) {
- //ignore
- }
+ if (null != out) {
+ try {
+ out.close();
+ } catch (Throwable e) {
+ //ignore
}
+ }
- this.kas.removeSession(this);
+ if (null != socket) {
+ try {
+ socket.close();
+ } catch (Throwable e) {
+ //ignore
+ }
}
}
}
Modified:
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
(original)
+++
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServerServiceFilter.java
Thu Jan 10 11:37:21 2013
@@ -35,39 +35,47 @@ public class ServerServiceFilter impleme
@Managed
private final ServerService service;
- public ServerServiceFilter(ServerService service) {
+ public ServerServiceFilter(final ServerService service) {
this.service = service;
}
+ @Override
public String getIP() {
return service.getIP();
}
+ @Override
public String getName() {
return service.getName();
}
+ @Override
public int getPort() {
return service.getPort();
}
- public void service(InputStream in, OutputStream out) throws
ServiceException, IOException {
+ @Override
+ public void service(final InputStream in, final OutputStream out) throws
ServiceException, IOException {
service.service(in, out);
}
- public void service(Socket socket) throws ServiceException, IOException {
+ @Override
+ public void service(final Socket socket) throws ServiceException,
IOException {
service.service(socket);
}
+ @Override
public void start() throws ServiceException {
service.start();
}
+ @Override
public void stop() throws ServiceException {
service.stop();
}
- public void init(Properties props) throws Exception {
+ @Override
+ public void init(final Properties props) throws Exception {
service.init(props);
}
}
Modified:
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java
(original)
+++
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServiceLogger.java
Thu Jan 10 11:37:21 2013
@@ -37,12 +37,12 @@ public class ServiceLogger extends Serve
private Logger logger;
private boolean debug = false;
- public ServiceLogger(ServerService next) {
+ public ServiceLogger(final ServerService next) {
super(next);
}
@Override
- public void init(Properties props) throws Exception {
+ public void init(final Properties props) throws Exception {
this.logger =
Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("service." +
getName()), "org.apache.openejb.server.util.resources");
this.debug = this.logger.isDebugEnabled();
@@ -51,7 +51,7 @@ public class ServiceLogger extends Serve
}
@Override
- public void service(InputStream in, OutputStream out) throws
ServiceException, IOException {
+ public void service(final InputStream in, final OutputStream out) throws
ServiceException, IOException {
throw new UnsupportedOperationException("service(in,out)");
}
@@ -63,7 +63,7 @@ public class ServiceLogger extends Serve
MDBput = MDC.getMethod("put", String.class, String.class);
} catch (Exception e) { // no need to log it with a higher level
Logger.getInstance(LogCategory.OPENEJB,
ServiceLogger.class.getName())
- .debug("can't find log4j MDC class");
+ .debug("can't find log4j MDC class");
}
}
@@ -78,9 +78,9 @@ public class ServiceLogger extends Serve
}
@Override
- public void service(Socket socket) throws ServiceException, IOException {
+ public void service(final Socket socket) throws ServiceException,
IOException {
- InetAddress client = socket.getInetAddress();
+ final InetAddress client = socket.getInetAddress();
final String address = client.getHostAddress();
final String name = this.getName();
Modified:
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
(original)
+++
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
Thu Jan 10 11:37:21 2013
@@ -28,42 +28,101 @@ import java.io.OutputStream;
import java.net.Socket;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
@Managed
public class ServicePool extends ServerServiceFilter {
+
private static final Logger log =
Logger.getInstance(LogCategory.SERVICEPOOL,
"org.apache.openejb.util.resources");
private final ThreadPoolExecutor threadPool;
private final AtomicBoolean stop = new AtomicBoolean();
public ServicePool(final ServerService next, final Properties properties) {
- this(next, new Options(properties).get("threads", 100));
+ //Liberal defaults
+ this(next, new Options(properties).get("threads", 50), new
Options(properties).get("queue", 50000), new Options(properties).get("block",
false));
}
- public ServicePool(final ServerService next, final int threads) {
+ public ServicePool(final ServerService next, int threads, int queue, final
boolean block) {
super(next);
- final int keepAliveTime = (1000 * 60 * 5);
+ if (threads < 1) {
+ threads = 1;
+ }
+
+ if (queue < 1) {
+ queue = 1;
+ }
+
+ /**
+ This thread pool starts with 2 core threads and can grow to the limit
defined by 'threads'.
+ If a pool thread is idle for more than 1 minute it will be discarded,
unless the core size is reached.
+ It can accept upto the number of processes defined by 'queue'.
+ If the queue is full then an attempt is made to add the process to
the queue for 10 seconds.
+ Failure to add to the queue in this time will either result in a
logged rejection, or if 'block'
+ is true then a final attempt is made to run the process in the
current thread (the service thread).
+ */
- threadPool = new ThreadPoolExecutor(threads, threads, keepAliveTime,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ threadPool = new ThreadPoolExecutor(2, threads, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(queue));
threadPool.setThreadFactory(new ThreadFactory() {
- private volatile int id = 0;
+
+ private final AtomicInteger i = new AtomicInteger(0);
@Override
public Thread newThread(final Runnable arg0) {
- return new Thread(arg0, getName() + " " + getNextID());
- }
+ final Thread t = new Thread(arg0, "OpenEJB." + getName() + "."
+ i.incrementAndGet());
+
+ t.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final
Throwable e) {
+ log.error("Uncaught error in: " + t.getName(), e);
+ }
+ });
- private int getNextID() {
- return id++;
+ return t;
}
});
+ threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(final Runnable r, final
ThreadPoolExecutor tpe) {
+
+ if (null == r || null == tpe || tpe.isShutdown() ||
tpe.isTerminated() || tpe.isTerminating()) {
+ return;
+ }
+
+ if (log.isWarningEnabled()) {
+ log.warning("ServicePool at capicity for process: " + r);
+ }
+
+ boolean offer = false;
+ try {
+ offer = tpe.getQueue().offer(r, 10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+
+ if (!offer) {
+ log.error("ServicePool failed to run asynchronous process:
" + r);
+
+ if (block) {
+ try {
+ //Last ditch effort to run the process in the
current thread
+ r.run();
+ } catch (Throwable e) {
+ log.error("ServicePool failed to run synchronous
process: " + r);
+ }
+ }
+ }
+ }
+ });
+
SystemInstance.get().setComponent(ServicePool.class, this);
}
@@ -77,12 +136,24 @@ public class ServicePool extends ServerS
@Override
public void service(final Socket socket) throws ServiceException,
IOException {
- final Runnable service = new Runnable() {
+
+ final ClassLoader tccl =
Thread.currentThread().getContextClassLoader();
+ final Runnable ctxCL = new Runnable() {
@Override
public void run() {
+
+ ClassLoader cl = null;
+
try {
- if (stop.get()) return;
+ cl = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(tccl);
+
+ if (stop.get()) {
+ return;
+ }
+
ServicePool.super.service(socket);
+
} catch (SecurityException e) {
final String msg = "ServicePool: Security error: " +
e.getMessage();
if (log.isDebugEnabled()) {
@@ -104,14 +175,12 @@ public class ServicePool extends ServerS
} else {
log.error(msg + " - Debug for StackTrace");
}
+
} finally {
+
+ //Ensure delegated socket is closed here
+
try {
- // Once the thread is done with the socket, clean it up
- // The ServiceDaemon does not close the sockets as it
is
- // single threaded and only accepts sockets and then
- // hands them off to be proceeceed. As the thread
doing
- // that processing it is our job to close the socket
- // when we are finished with it.
if (socket != null) {
socket.close();
}
@@ -123,19 +192,7 @@ public class ServicePool extends ServerS
log.warning(msg);
}
}
- }
- }
- };
- final ClassLoader tccl =
Thread.currentThread().getContextClassLoader();
- final Runnable ctxCL = new Runnable() {
- @Override
- public void run() {
- final ClassLoader cl =
Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(tccl);
- try {
- service.run();
- } finally {
Thread.currentThread().setContextClassLoader(cl);
}
}
@@ -149,6 +206,7 @@ public class ServicePool extends ServerS
@Managed(append = true)
public class Pool {
+
@Managed
public boolean isShutdown() {
return threadPool.isShutdown();
@@ -180,7 +238,7 @@ public class ServicePool extends ServerS
}
@Managed
- public long getKeepAliveTime(TimeUnit unit) {
+ public long getKeepAliveTime(final TimeUnit unit) {
return threadPool.getKeepAliveTime(unit);
}
@@ -210,22 +268,22 @@ public class ServicePool extends ServerS
}
@Managed
- public void setMaximumPoolSize(int maximumPoolSize) {
+ public void setMaximumPoolSize(final int maximumPoolSize) {
threadPool.setMaximumPoolSize(maximumPoolSize);
}
@Managed
- public void setCorePoolSize(int corePoolSize) {
+ public void setCorePoolSize(final int corePoolSize) {
getThreadPool().setCorePoolSize(corePoolSize);
}
@Managed
- public void allowCoreThreadTimeOut(boolean value) {
+ public void allowCoreThreadTimeOut(final boolean value) {
getThreadPool().allowCoreThreadTimeOut(value);
}
@Managed(description = "Sets time in nanoseconds")
- public void setKeepAliveTime(long time) {
+ public void setKeepAliveTime(final long time) {
getThreadPool().setKeepAliveTime(time, TimeUnit.NANOSECONDS);
}
}
Modified:
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java?rev=1431290&r1=1431289&r2=1431290&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java
(original)
+++
openejb/trunk/openejb/server/openejb-server/src/main/java/org/apache/openejb/server/Status.java
Thu Jan 10 11:37:21 2013
@@ -24,7 +24,7 @@ import java.net.Socket;
public class Status {
public static void main(String[] args) {
-// System.exit(new Start().start()?0:1);
+ // System.exit(new Start().start()?0:1);
new Status().status();
}
@@ -43,10 +43,14 @@ public class Status {
}
private boolean connect(int tries) {
+
+ Socket socket = null;
+ OutputStream out = null;
+
try {
final int port =
SystemInstance.get().getOptions().get("ejbd.port", 4201);
- Socket socket = new Socket("localhost", port);
- OutputStream out = socket.getOutputStream();
+ socket = new Socket("localhost", port);
+ out = socket.getOutputStream();
} catch (Exception e) {
if (tries < 2) {
return false;
@@ -58,6 +62,22 @@ public class Status {
}
return connect(--tries);
}
+ } finally {
+ if (null != out) {
+ try {
+ out.close();
+ } catch (Throwable t) {
+ //Ignore
+ }
+ }
+
+ if (null != socket) {
+ try {
+ socket.close();
+ } catch (Throwable t) {
+ //Ignore
+ }
+ }
}
return true;