Author: andygumbrecht Date: Wed Apr 10 13:27:40 2013 New Revision: 1466480 URL: http://svn.apache.org/r1466480 Log: MulticastPulseClient should pulse at regular intervals until the lookup timeout is reached, rather than just once per lookup - Missed that even though I left a thread spare?. AbstractConnectionStrategy synchronize connection construction. Removed duplicate line in ClientDataSource. Finals and overrides.
Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java?rev=1466480&r1=1466479&r2=1466480&view=diff ============================================================================== --- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java (original) +++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/AbstractConnectionStrategy.java Wed Apr 10 13:27:40 2013 @@ -20,59 +20,77 @@ import org.apache.openejb.client.event.B import org.apache.openejb.client.event.FailoverSelection; import java.io.IOException; -import java.net.ConnectException; import java.net.URI; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; /** * @version $Rev$ $Date$ */ public abstract class AbstractConnectionStrategy implements ConnectionStrategy { + private final ReentrantLock lock = new ReentrantLock(true); + @Override - public Connection connect(ClusterMetaData cluster, ServerMetaData server) throws IOException { - final Set<URI> failed = Client.getFailed(); - final Set<URI> remaining = new HashSet<URI>(); + public Connection connect(final ClusterMetaData cluster, final ServerMetaData server) throws IOException { - boolean failover = false; + final ReentrantLock l = lock; + l.lock(); - final Iterable<URI> iterable = getIterable(cluster); - for (URI uri : iterable) { - if (failed.contains(uri)) continue; + try { + final Set<URI> failed = Client.getFailed(); + final Set<URI> remaining = new HashSet<URI>(); - if (failover) Client.fireEvent(createFailureEvent(remaining, failed, uri)); + boolean failover = false; - try { - return connect(cluster, uri); - } catch (IOException e) { + final Iterable<URI> iterable = getIterable(cluster); + for (final URI uri : iterable) { + if (failed.contains(uri)) { + continue; + } - if (!failover) { - Collections.addAll(remaining, cluster.getLocations()); - remaining.removeAll(failed); + if (failover) { + Client.fireEvent(createFailureEvent(remaining, failed, uri)); } - failed.add(uri); - remaining.remove(uri); - failover = true; + try { + return connect(cluster, uri); + } catch (IOException e) { + + if (!failover) { + Collections.addAll(remaining, cluster.getLocations()); + remaining.removeAll(failed); + } + + failed.add(uri); + remaining.remove(uri); + failover = true; + } } - } - final URI uri = server.getLocation(); + final URI uri = server.getLocation(); - if (uri == null) throw new RemoteFailoverException("Attempted to connect to " + failed.size() + " servers."); + if (uri == null) { + throw new RemoteFailoverException("Attempted to connect to " + failed.size() + " servers."); + } - Client.fireEvent(new BootstrappingConnection(uri)); + Client.fireEvent(new BootstrappingConnection(uri)); - return connect(cluster, uri); + return connect(cluster, uri); + } finally { + l.unlock(); + } } - private Iterable<URI> getIterable(ClusterMetaData cluster) { + private Iterable<URI> getIterable(final ClusterMetaData cluster) { final Context context = cluster.getContext(); final StrategyData data = context.getComponent(StrategyData.class); - if (data != null) return data.getIterable(); + if (data != null) { + return data.getIterable(); + } context.setComponent(StrategyData.class, new StrategyData(createIterable(cluster))); @@ -83,8 +101,8 @@ public abstract class AbstractConnection protected abstract Iterable<URI> createIterable(ClusterMetaData cluster); - protected Connection connect(ClusterMetaData cluster, URI uri) throws IOException { - Connection connection = ConnectionManager.getConnection(uri); + protected Connection connect(final ClusterMetaData cluster, final URI uri) throws IOException { + final Connection connection = ConnectionManager.getConnection(uri); // Grabbing the URI from the associated connection allows the ConnectionFactory to // employ discovery to find and connect to a server. We then attempt to connect @@ -94,9 +112,10 @@ public abstract class AbstractConnection } private static class StrategyData { + private final Iterable<URI> iterable; - private StrategyData(Iterable<URI> iterable) { + private StrategyData(final Iterable<URI> iterable) { this.iterable = iterable; } Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java?rev=1466480&r1=1466479&r2=1466480&view=diff ============================================================================== --- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java (original) +++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/ClientDataSource.java Wed Apr 10 13:27:40 2013 @@ -29,17 +29,18 @@ import java.util.logging.Logger; /** * @version $Rev$ $Date$ */ +@SuppressWarnings("UseOfSystemOutOrSystemErr") public class ClientDataSource implements DataSource { + private final String jdbcUrl; private final String defaultPassword; private final String defaultUserName; - public static void main(String[] args) throws URISyntaxException { + public static void main(final String[] args) throws URISyntaxException { URI uri1; - uri1 = new URI("datasource", null, "/path",null, null); - uri1 = new URI("datasource", null, "/path",null, null); + uri1 = new URI("datasource", null, "/path", null, null); System.out.println("uri = " + uri1); - uri1 = new URI("datasource", "host", "/path",null, null); + uri1 = new URI("datasource", "host", "/path", null, null); System.out.println("uri = " + uri1); uri1 = new URI("datasource", "host", "/path", "query", "fragment"); System.out.println("uri = " + uri1); @@ -48,7 +49,7 @@ public class ClientDataSource implements print(new URI(uri1.getSchemeSpecificPart())); } - private static void print(URI uri1) { + private static void print(final URI uri1) { System.out.println("uri = " + uri1); System.out.println(" scheme = " + uri1.getScheme()); System.out.println(" part = " + uri1.getSchemeSpecificPart()); @@ -57,61 +58,73 @@ public class ClientDataSource implements System.out.println(" query = " + uri1.getQuery()); } - public ClientDataSource(DataSourceMetaData d) { + public ClientDataSource(final DataSourceMetaData d) { this(d.getJdbcDriver(), d.getJdbcUrl(), d.getDefaultUserName(), d.getDefaultPassword()); } - public ClientDataSource(String jdbcDriver, String jdbcUrl, String defaultUserName, String defaultPassword) { + public ClientDataSource(final String jdbcDriver, final String jdbcUrl, final String defaultUserName, final String defaultPassword) { this.defaultPassword = defaultPassword; this.defaultUserName = defaultUserName; this.jdbcUrl = jdbcUrl; - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Class.forName(jdbcDriver, true, classLoader); } catch (ClassNotFoundException e) { - throw new IllegalStateException("Cannot use DataSource in client VM without the JDBC Driver in classpath: "+jdbcDriver, e); + throw new IllegalStateException("Cannot use DataSource in client VM without the JDBC Driver in classpath: " + jdbcDriver, e); } catch (NoClassDefFoundError e) { - throw new IllegalStateException("Cannot use DataSource in client VM without the JDBC Driver in classpath: "+jdbcDriver, e); + throw new IllegalStateException("Cannot use DataSource in client VM without the JDBC Driver in classpath: " + jdbcDriver, e); } } + @Override public Connection getConnection() throws SQLException { return getConnection(defaultUserName, defaultPassword); } - public Connection getConnection(String username, String password) throws SQLException { - Connection connection = DriverManager.getConnection(jdbcUrl, username, password); - return connection; + @Override + public Connection getConnection(final String username, final String password) throws SQLException { + return DriverManager.getConnection(jdbcUrl, username, password); } + @Override public int getLoginTimeout() throws SQLException { return 0; } + @Override public PrintWriter getLogWriter() throws SQLException { return null; } - public void setLoginTimeout(int seconds) throws SQLException { + @Override + public void setLoginTimeout(final int seconds) throws SQLException { } - public void setLogWriter(PrintWriter out) throws SQLException { + @Override + public void setLogWriter(final PrintWriter out) throws SQLException { } - public boolean isWrapperFor(java.lang.Class<?> iface) { - if (iface == null) throw new NullPointerException("iface is null"); + @Override + public boolean isWrapperFor(final java.lang.Class<?> iface) { + if (iface == null) { + throw new NullPointerException("iface is null"); + } return iface.isInstance(this); } + @Override @SuppressWarnings({"unchecked"}) - public <T> T unwrap(Class<T> iface) throws SQLException { - if (iface == null) throw new NullPointerException("iface is null"); + public <T> T unwrap(final Class<T> iface) throws SQLException { + if (iface == null) { + throw new NullPointerException("iface is null"); + } if (iface.isInstance(this)) { return (T) this; } throw new SQLException(getClass().getName() + " does not implement " + iface.getName()); } + @SuppressWarnings("override") public Logger getParentLogger() throws SQLFeatureNotSupportedException { return null; } Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java?rev=1466480&r1=1466479&r2=1466480&view=diff ============================================================================== --- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java (original) +++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/EJBObjectHandler.java Wed Apr 10 13:27:40 2013 @@ -212,51 +212,43 @@ public abstract class EJBObjectHandler e if (m.getDeclaringClass().equals(Object.class)) { - if (m.equals(TOSTRING)) + if (m.equals(TOSTRING)) { return "proxy=" + this; - - else if (m.equals(EQUALS)) + } else if (m.equals(EQUALS)) { return equals(m, a, p); - - else if (m.equals(HASHCODE)) + } else if (m.equals(HASHCODE)) { return this.hashCode(); - - else + } else { throw new UnsupportedOperationException("Unkown method: " + m); + } } else if (m.getDeclaringClass() == EJBObjectProxy.class) { - if (m.equals(GETHANDLER)) + if (m.equals(GETHANDLER)) { return this; - - else if (m.getName().equals("writeReplace")) + } else if (m.getName().equals("writeReplace")) { return new EJBObjectProxyHandle(this); - - else if (m.getName().equals("readResolve")) + } else if (m.getName().equals("readResolve")) { return null; - - else + } else { throw new UnsupportedOperationException("Unkown method: " + m); + } } else if (m.getDeclaringClass() == javax.ejb.EJBObject.class) { - if (m.equals(GETHANDLE)) + if (m.equals(GETHANDLE)) { return getHandle(m, a, p); - - else if (m.equals(GETPRIMARYKEY)) + } else if (m.equals(GETPRIMARYKEY)) { return getPrimaryKey(m, a, p); - - else if (m.equals(ISIDENTICAL)) + } else if (m.equals(ISIDENTICAL)) { return isIdentical(m, a, p); - - else if (m.equals(GETEJBHOME)) + } else if (m.equals(GETEJBHOME)) { return getEJBHome(m, a, p); - - else if (m.equals(REMOVE)) + } else if (m.equals(REMOVE)) { return remove(m, a, p); - - else + } else { throw new UnsupportedOperationException("Unkown method: " + m); + } } else { @@ -286,12 +278,14 @@ public abstract class EJBObjectHandler e } } catch (Throwable throwable) { if (remote) { - if (throwable instanceof RemoteException) + if (throwable instanceof RemoteException) { throw throwable; + } throw new RemoteException("Unknown Container Exception: " + throwable.getClass().getName() + ": " + throwable.getMessage(), getCause(throwable)); } else { - if (throwable instanceof EJBException) + if (throwable instanceof EJBException) { throw throwable; + } throw new EJBException("Unknown Container Exception: " + throwable.getClass().getName() + ": " + throwable.getMessage()).initCause(getCause(throwable)); } } @@ -457,9 +451,11 @@ public abstract class EJBObjectHandler e final EJBResponse res = request(req); if (res.getResponseCode() != ResponseCodes.EJB_OK) { //TODO how do we notify the user that we fail to configure the value ? + Logger.getLogger(this.getClass().getName()).info("Unexpected response on cancel: " + res); } } catch (Exception e) { //TODO how to handle + Logger.getLogger(this.getClass().getName()).log(Level.INFO, "Unexpected error on cancel", e); return false; } } Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1466480&r1=1466479&r2=1466480&view=diff ============================================================================== --- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original) +++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Wed Apr 10 13:27:40 2013 @@ -81,7 +81,11 @@ public class MulticastPulseClient extend if (knownUris.size() >= LIMIT) { //This is here just as a brake to prevent DOS or OOME. //There is no way we should have more than this number of unique MutliPulse URI's in a LAN - throw new IllegalArgumentException("Unique MultiPulse URI limit of " + LIMIT + " reached. Increase using the system property '" + ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT + "'"); + throw new IllegalArgumentException("Unique MultiPulse URI limit of " + + LIMIT + + " reached. Increase using the system property '" + + ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT + + "'"); } Set<URI> uriSet = knownUris.get(uri); @@ -232,7 +236,7 @@ public class MulticastPulseClient extend //Start threads that listen for multicast packets on our channel. //These need to start 'before' we pulse a request. final ArrayList<Future> futures = new ArrayList<Future>(); - final CountDownLatch latch = new CountDownLatch(clientSocketsFinal.length); + final CountDownLatch latchListeners = new CountDownLatch(clientSocketsFinal.length); for (final MulticastSocket socket : clientSocketsFinal) { @@ -240,8 +244,8 @@ public class MulticastPulseClient extend @Override public void run() { try { - latch.countDown(); final DatagramPacket response = new DatagramPacket(new byte[2048], 2048); + latchListeners.countDown(); while (running.get()) { try { @@ -354,17 +358,38 @@ public class MulticastPulseClient extend } try { - //Give threads a reasonable amount of time to start - if (latch.await(5, TimeUnit.SECONDS)) { + //Give listener threads a reasonable amount of time to start + if (latchListeners.await(5, TimeUnit.SECONDS)) { - //Pulse the server - It is thread safe to use same sockets as send/receive synchronization is only on the packet - for (final MulticastSocket socket : clientSocketsFinal) { - try { - socket.send(request); - } catch (Throwable e) { - //Ignore + //Start pulsing request every 20ms - This will ensure we have at least 2 pulses within our minimum timeout + futures.add(0, executor.submit(new Runnable() { + @Override + public void run() { + while (running.get()) { + //Pulse to listening servers - It is thread safe to use same sockets as send/receive synchronization is only on the packet + for (final MulticastSocket socket : clientSocketsFinal) { + + if (running.get()) { + try { + socket.send(request); + } catch (Throwable e) { + //Ignore + } + } else { + break; + } + } + + if (running.get()) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + break; + } + } + } } - } + })); } else { timeout = 1; } Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java?rev=1466480&r1=1466479&r2=1466480&view=diff ============================================================================== --- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java (original) +++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/proxy/Jdk13InvocationHandler.java Wed Apr 10 13:27:40 2013 @@ -26,7 +26,7 @@ public class Jdk13InvocationHandler impl public Jdk13InvocationHandler() { } - public Jdk13InvocationHandler(InvocationHandler delegate) { + public Jdk13InvocationHandler(final InvocationHandler delegate) { setInvocationHandler(delegate); } @@ -34,13 +34,14 @@ public class Jdk13InvocationHandler impl return delegate; } - public InvocationHandler setInvocationHandler(InvocationHandler handler) { - InvocationHandler old = delegate; + public InvocationHandler setInvocationHandler(final InvocationHandler handler) { + final InvocationHandler old = delegate; delegate = handler; return old; } - public Object invoke(Object proxy, Method method, Object... args) throws Throwable { + @Override + public Object invoke(final Object proxy, final Method method, final Object... args) throws Throwable { if (delegate == null) throw new NullPointerException("No invocation handler for proxy " + proxy); return delegate.invoke(proxy, method, args);