Author: andygumbrecht
Date: Tue Dec 11 09:51:09 2012
New Revision: 1420056
URL: http://svn.apache.org/viewvc?rev=1420056&view=rev
Log:
Fix https://issues.apache.org/jira/browse/OPENEJB-1967
Also lock should be obtained 'before' tf block.
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java?rev=1420056&r1=1420055&r2=1420056&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java
(original)
+++
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/ConnectionPoolTimeoutException.java
Tue Dec 11 09:51:09 2012
@@ -25,11 +25,11 @@ public class ConnectionPoolTimeoutExcept
public ConnectionPoolTimeoutException() {
}
- public ConnectionPoolTimeoutException(String s, Exception e) {
+ public ConnectionPoolTimeoutException(final String s, final Exception e) {
super(s, e);
}
- public ConnectionPoolTimeoutException(String s) {
+ public ConnectionPoolTimeoutException(final String s) {
super(s);
}
}
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java?rev=1420056&r1=1420055&r2=1420056&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
(original)
+++
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
Tue Dec 11 09:51:09 2012
@@ -58,14 +58,16 @@ public class SocketConnectionFactory imp
private int size = 5;
private long timeoutPool = 1000;
private int timeoutSocket = 500;
+ private int timeoutLinger;
private String[] enabledCipherSuites;
public SocketConnectionFactory() {
- this.size = getSize();
- this.timeoutPool = getTimeoutPool();
- this.timeoutSocket = getTimeoutSocket();
- this.enabledCipherSuites = getEnabledCipherSuites();
+ this.size = this.getSize();
+ this.timeoutPool = this.getTimeoutPool();
+ this.timeoutSocket = this.getTimeoutSocket();
+ this.timeoutLinger = this.getTimeoutLinger();
+ this.enabledCipherSuites = this.getEnabledCipherSuites();
try {
String property = System.getProperty(PROPERTY_KEEPALIVE);
if (property != null) {
@@ -93,6 +95,15 @@ public class SocketConnectionFactory imp
return timeout;
}
+ private int getTimeoutLinger() {
+ long pool = this.timeoutPool;
+ if (pool < 1000) {
+ pool = 1000;
+ }
+
+ return (int) (pool / 1000);
+ }
+
private int getTimeoutSocket() {
final Properties p = System.getProperties();
return getInt(p, SocketConnectionFactory.PROPERTY_SOCKET_TIMEOUT,
this.timeoutSocket);
@@ -130,7 +141,7 @@ public class SocketConnectionFactory imp
@Override
public Connection getConnection(final URI uri) throws java.io.IOException {
- final Pool pool = getPool(uri);
+ final Pool pool = this.getPool(uri);
SocketConnection conn = pool.get();
if (conn == null) {
@@ -144,7 +155,9 @@ public class SocketConnectionFactory imp
}
try {
- conn.lock.tryLock(2, TimeUnit.SECONDS);
+ if (!conn.lock.tryLock(2, TimeUnit.SECONDS)) {
+ throw new InterruptedException();
+ }
} catch (InterruptedException e) {
Thread.interrupted();
pool.put(conn);
@@ -154,7 +167,7 @@ public class SocketConnectionFactory imp
final OutputStream ouputStream = conn.getOuputStream();
if (conn.socket.isClosed()) {
pool.put(null);
- return getConnection(uri);
+ return this.getConnection(uri);
}
try {
@@ -184,7 +197,7 @@ public class SocketConnectionFactory imp
private Pool getPool(final URI uri) {
Pool pool = connections.get(uri);
if (pool == null) {
- pool = new Pool(uri, getSize(), this.timeoutPool);
+ pool = new Pool(uri, this.getSize(), this.timeoutPool);
connections.put(uri, pool);
}
return pool;
@@ -212,7 +225,7 @@ public class SocketConnectionFactory imp
try {
- cleanUp();
+ this.cleanUp();
} finally {
super.finalize();
@@ -220,25 +233,25 @@ public class SocketConnectionFactory imp
}
private void cleanUp() {
- if (null != in) {
+ if (null != this.in) {
try {
- in.close();
+ this.in.close();
} catch (Throwable e) {
//Ignore
}
}
- if (null != out) {
+ if (null != this.out) {
try {
- out.close();
+ this.out.close();
} catch (Throwable e) {
//Ignore
}
}
- if (null != socket) {
+ if (null != this.socket) {
try {
- socket.close();
+ this.socket.close();
} catch (Throwable e) {
//Ignore
}
@@ -256,7 +269,7 @@ public class SocketConnectionFactory imp
final String scheme = uri.getScheme();
if (scheme.equalsIgnoreCase("ejbds") ||
scheme.equalsIgnoreCase("zejbds")) {
final SSLSocket sslSocket = (SSLSocket)
SSLSocketFactory.getDefault().createSocket();
- sslSocket.setEnabledCipherSuites(enabledCipherSuites);
+
sslSocket.setEnabledCipherSuites(SocketConnectionFactory.this.enabledCipherSuites);
this.socket = sslSocket;
} else {
@@ -264,11 +277,11 @@ public class SocketConnectionFactory imp
}
if (scheme.startsWith("z")) {
- gzip = true;
+ this.gzip = true;
}
this.socket.setTcpNoDelay(true);
- this.socket.setSoLinger(true, 10);
+ this.socket.setSoLinger(true,
SocketConnectionFactory.this.timeoutLinger);
this.socket.connect(address,
SocketConnectionFactory.this.timeoutSocket);
Client.fireEvent(new ConnectionOpened(uri));
@@ -296,10 +309,10 @@ public class SocketConnectionFactory imp
@Override
public void discard() {
try {
- pool.put(null);
+ this.pool.put(null);
} finally {
- discarded = true;
- cleanUp();
+ this.discarded = true;
+ this.cleanUp();
}
// don't bother unlocking it
@@ -308,15 +321,19 @@ public class SocketConnectionFactory imp
@Override
public URI getURI() {
- return uri;
+ return this.uri;
}
@Override
public void close() throws IOException {
- if (discarded) return;
+ if (this.discarded) return;
- pool.put(this);
- lock.unlock();
+ this.pool.put(this);
+ try {
+ this.lock.unlock();
+ } catch (IllegalMonitorStateException e) {
+ //Ignore
+ }
}
@Override
@@ -325,15 +342,15 @@ public class SocketConnectionFactory imp
/* Open input streams */
/*----------------------------------*/
try {
- if (in == null) {
- if (!gzip) {
- in = new BufferedInputStream(socket.getInputStream());
+ if (this.in == null) {
+ if (!this.gzip) {
+ this.in = new
BufferedInputStream(this.socket.getInputStream());
} else {
- in = new GZIPInputStream(new
BufferedInputStream(socket.getInputStream()));
+ this.in = new GZIPInputStream(new
BufferedInputStream(this.socket.getInputStream()));
}
}
- return new Input(in);
+ return new Input(this.in);
} catch (StreamCorruptedException e) {
throw this.failure("Cannot open input stream to server, the
stream has been corrupted: " + e.getClass().getName(), e);
@@ -351,15 +368,15 @@ public class SocketConnectionFactory imp
try {
- if (out == null) {
- if (!gzip) {
- out = new
BufferedOutputStream(socket.getOutputStream());
+ if (this.out == null) {
+ if (!this.gzip) {
+ this.out = new
BufferedOutputStream(this.socket.getOutputStream());
} else {
- out = new BufferedOutputStream(new
FlushableGZIPOutputStream(socket.getOutputStream()));
+ this.out = new BufferedOutputStream(new
FlushableGZIPOutputStream(this.socket.getOutputStream()));
}
}
- return new Output(out);
+ return new Output(this.out);
} catch (IOException e) {
throw this.failure("Cannot open output stream to server: " +
e.getClass().getName(), e);
@@ -388,7 +405,7 @@ public class SocketConnectionFactory imp
@Override
public void close() throws IOException {
- flush();
+ this.flush();
}
}
@@ -409,38 +426,38 @@ public class SocketConnectionFactory imp
this.timeUnit = TimeUnit.MILLISECONDS;
for (int i = 0; i < size; i++) {
- pool.push(null);
+ this.pool.push(null);
}
- Client.fireEvent(new ConnectionPoolCreated(uri, size, timeout,
timeUnit));
+ Client.fireEvent(new ConnectionPoolCreated(uri, size, timeout,
this.timeUnit));
}
public SocketConnection get() throws IOException {
try {
- if (semaphore.tryAcquire(timeout, timeUnit)) {
- return pool.pop();
+ if (this.semaphore.tryAcquire(this.timeout, this.timeUnit)) {
+ return this.pool.pop();
}
} catch (InterruptedException e) {
Thread.interrupted();
}
- final ConnectionPoolTimeoutException exception = new
ConnectionPoolTimeoutException("No connections available in pool (size " + size
+ "). Waited for " + timeout + " milliseconds for a connection.");
+ final ConnectionPoolTimeoutException exception = new
ConnectionPoolTimeoutException("No connections available in pool (size " +
this.size + "). Waited for " + this.timeout + " milliseconds for a
connection.");
exception.fillInStackTrace();
- Client.fireEvent(new ConnectionPoolTimeout(uri, size, timeout,
timeUnit, exception));
+ Client.fireEvent(new ConnectionPoolTimeout(this.uri, this.size,
this.timeout, this.timeUnit, exception));
throw exception;
}
public void put(final SocketConnection connection) {
- pool.push(connection);
- semaphore.release();
+ this.pool.push(connection);
+ this.semaphore.release();
}
@Override
public String toString() {
return "Pool{" +
- "size=" + size +
- ", available=" + semaphore.availablePermits() +
- ", uri=" + uri +
+ "size=" + this.size +
+ ", available=" + this.semaphore.availablePermits() +
+ ", uri=" + this.uri +
'}';
}
}
Modified:
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=1420056&r1=1420055&r2=1420056&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
(original)
+++
openejb/trunk/openejb/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Tue Dec 11 09:51:09 2012
@@ -61,6 +61,7 @@ public class KeepAliveServer implements
private Timer timer;
private final boolean gzip;
+ @SuppressWarnings("deprecation")
public KeepAliveServer() {
this(new EjbServer());
}
@@ -81,7 +82,7 @@ public class KeepAliveServer implements
return;
}
- final BlockingQueue<Runnable> queue = getQueue();
+ final BlockingQueue<Runnable> queue = this.getQueue();
if (queue == null) return;
int backlog = queue.size();
@@ -94,11 +95,11 @@ public class KeepAliveServer implements
for (final Session session : current) {
- final Lock l = session.usage;
+ final Lock l = session.lock;
if (l.tryLock()) {
try {
- if (now - session.lastRequest > timeout) {
+ if (now - session.lastRequest > this.timeout) {
backlog--;
@@ -107,7 +108,7 @@ public class KeepAliveServer implements
} catch (Throwable e) {
//Ignore
} finally {
- removeSession(session);
+ this.removeSession(session);
}
}
} finally {
@@ -127,7 +128,7 @@ public class KeepAliveServer implements
for (final Session session : current) {
- final Lock l = session.usage;
+ final Lock l = session.lock;
if (l.tryLock()) {
try {
@@ -135,7 +136,7 @@ public class KeepAliveServer implements
} catch (Throwable e) {
//Ignore
} finally {
- removeSession(session);
+ this.removeSession(session);
l.unlock();
}
} else if (logger.isDebugEnabled()) {
@@ -183,7 +184,7 @@ public class KeepAliveServer implements
private final Thread thread;
private final KeepAliveServer kas;
- private final Lock usage = new ReentrantLock();
+ private final Lock lock = new ReentrantLock();
// only used inside the Lock
private long lastRequest;
@@ -206,7 +207,7 @@ public class KeepAliveServer implements
try {
final InputStream in;
final OutputStream out;
- if (!gzip) {
+ if (!KeepAliveServer.this.gzip) {
in = new BufferedInputStream(socket.getInputStream());
out = new BufferedOutputStream(socket.getOutputStream());
} else {
@@ -214,7 +215,7 @@ public class KeepAliveServer implements
out = new BufferedOutputStream(new
FlushableGZIPOutputStream(socket.getOutputStream()));
}
- while (running.get()) {
+ while (KeepAliveServer.this.running.get()) {
try {
i = in.read();
} catch (SocketException e) {
@@ -227,9 +228,10 @@ public class KeepAliveServer implements
}
final KeepAliveStyle style = KeepAliveStyle.values()[i];
- final Lock l = this.usage;
+ final Lock l = this.lock;
+ l.lock();
+
try {
- l.lock();
switch (style) {
case PING_PING: {
@@ -244,7 +246,7 @@ public class KeepAliveServer implements
}
try {
- service.service(new Input(in), new Output(out));
+ KeepAliveServer.this.service.service(new
Input(in), new Output(out));
out.flush();
} catch (SocketException e) {
// Socket closed.
@@ -277,17 +279,17 @@ public class KeepAliveServer implements
@Override
public String getIP() {
- return service.getIP();
+ return this.service.getIP();
}
@Override
public String getName() {
- return service.getName();
+ return this.service.getName();
}
@Override
public int getPort() {
- return service.getPort();
+ return this.service.getPort();
}
@Override
@@ -316,7 +318,7 @@ public class KeepAliveServer implements
@Override
public void init(final Properties props) throws Exception {
- service.init(props);
+ this.service.init(props);
}
public class Input extends java.io.FilterInputStream {
@@ -337,7 +339,7 @@ public class KeepAliveServer implements
@Override
public void close() throws IOException {
- flush();
+ this.flush();
}
}