Author: andygumbrecht Date: Wed Oct 9 12:35:05 2013 New Revision: 1530583 URL: http://svn.apache.org/r1530583 Log: Fix https://issues.apache.org/jira/browse/OPENEJB-2043
Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1530583&r1=1530582&r2=1530583&view=diff ============================================================================== --- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original) +++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Wed Oct 9 12:35:05 2013 @@ -35,6 +35,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -66,8 +67,24 @@ public class MulticastPulseClient extend private static final int TTL = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, "32")); private static final int LIMIT = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT, "50000")); private static final Map<URI, Set<URI>> knownUris = new HashMap<URI, Set<URI>>(); - private static final NetworkInterface[] interfaces = getNetworkInterfaces(); - private static final ExecutorService executor = Executors.newFixedThreadPool(interfaces.length + 1); + private static NetworkInterface[] interfaces = getNetworkInterfaces(); + private static ExecutorService executor = null; + + private static synchronized NetworkInterface[] getInterfaces() { + if (null == interfaces) { + interfaces = getNetworkInterfaces(); + } + + return interfaces; + } + + private static synchronized ExecutorService getExecutorService() { + if (null == executor) { + executor = Executors.newFixedThreadPool(getInterfaces().length + 2); + } + + return executor; + } /** * @param uri Connection URI @@ -117,7 +134,7 @@ public class MulticastPulseClient extend try { //Strip serverhost and group and try to connect return ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart())); - } catch (Throwable e) { + } catch (Exception e) { uriSet.remove(serviceURI); } } @@ -200,7 +217,7 @@ public class MulticastPulseClient extend //Compare URI hosts int i = compare(u1.getHost(), u2.getHost()); - if (i == 0) { + if (i != 0) { i = uri1.compareTo(uri2); } @@ -223,7 +240,7 @@ public class MulticastPulseClient extend } else if (0 != h1.compareTo(h2)) { return -1; } - } catch (Throwable e) { + } catch (Exception e) { //Ignore } @@ -240,7 +257,7 @@ public class MulticastPulseClient extend for (final MulticastSocket socket : clientSocketsFinal) { - futures.add(executor.submit(new Runnable() { + futures.add(getExecutorService().submit(new Runnable() { @Override public void run() { try { @@ -258,6 +275,10 @@ public class MulticastPulseClient extend int len = response.getLength(); if (len > 2048) { + + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "Truncating multipulse length {0} to 2048", new Object[]{len}); + } len = 2048; } @@ -288,7 +309,7 @@ public class MulticastPulseClient extend final URI serviceUri; try { serviceUri = URI.create(svc); - } catch (Throwable e) { + } catch (Exception e) { continue; } @@ -327,7 +348,7 @@ public class MulticastPulseClient extend //Just add as is set.add(URI.create(svcfull)); } - } catch (Throwable e) { + } catch (Exception e) { //Ignore } finally { setLock.unlock(); @@ -337,19 +358,19 @@ public class MulticastPulseClient extend } } - } catch (Throwable e) { + } catch (Exception e) { //Ignore } } } finally { try { socket.leaveGroup(ia); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } try { socket.close(); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } } @@ -361,8 +382,9 @@ public class MulticastPulseClient extend //Give listener threads a reasonable amount of time to start if (latchListeners.await(5, TimeUnit.SECONDS)) { - //Start pulsing request every 20ms - This will ensure we have at least 2 pulses within our minimum timeout - futures.add(0, executor.submit(new Runnable() { + //Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout + //This pulse is designed to tell a listening server to wake up and pulse back a response + futures.add(0, getExecutorService().submit(new Runnable() { @Override public void run() { while (running.get()) { @@ -372,7 +394,7 @@ public class MulticastPulseClient extend if (running.get()) { try { socket.send(request); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } } else { @@ -382,7 +404,7 @@ public class MulticastPulseClient extend if (running.get()) { try { - Thread.sleep(20); + Thread.sleep(10); } catch (InterruptedException e) { break; } @@ -414,12 +436,12 @@ public class MulticastPulseClient extend try { socket.leaveGroup(ia); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } try { socket.close(); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } } @@ -430,7 +452,7 @@ public class MulticastPulseClient extend for (final Future future : futures) { try { future.get(); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } } @@ -446,12 +468,12 @@ public class MulticastPulseClient extend try { socket.leaveGroup(ia); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } try { socket.close(); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } } @@ -501,7 +523,7 @@ public class MulticastPulseClient extend final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>(); - for (final NetworkInterface ni : interfaces) { + for (final NetworkInterface ni : getInterfaces()) { MulticastSocket ms = null; @@ -518,12 +540,12 @@ public class MulticastPulseClient extend list.add(ms); - } catch (Throwable e) { + } catch (Exception e) { if (null != ms) { try { ms.close(); - } catch (Throwable t) { + } catch (Exception t) { //Ignore } } @@ -614,11 +636,14 @@ public class MulticastPulseClient extend Set<URI> uriSet = null; try { uriSet = MulticastPulseClient.discoverURIs(discover, new HashSet<String>(Arrays.asList("ejbd", "ejbds", "http", "https")), mchost, mcport, timeout); - } catch (Throwable e) { + } catch (Exception e) { System.err.println(e.getMessage()); } - if (uriSet != null && uriSet.size() > 0) { + final int size = uriSet.size(); + if (uriSet != null && size > 0) { + + final int st = (timeout / size); for (final URI uri : uriSet) { @@ -636,22 +661,24 @@ public class MulticastPulseClient extend continue; } + System.out.print(server + ":" + group + " - " + uriSub.toASCIIString() + " is reachable: "); + boolean b = false; final Socket s = new Socket(); try { - s.connect(new InetSocketAddress(host, port), 500); + s.connect(new InetSocketAddress(host, port), st); b = true; - } catch (Throwable e) { + } catch (Exception e) { //Ignore } finally { try { s.close(); - } catch (Throwable e) { + } catch (Exception e) { //Ignore } } - System.out.println(server + ":" + group + " - " + uriSub.toASCIIString() + " is reachable: " + b); + System.out.println(b); } } else { System.out.println("### Failed to discover server: " + discover); Modified: tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1530583&r1=1530582&r2=1530583&view=diff ============================================================================== --- tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java (original) +++ tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java Wed Oct 9 12:35:05 2013 @@ -59,7 +59,7 @@ public class MulticastPulseAgent impleme private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery").createChild("multipulse"), MulticastPulseAgent.class); private static final NetworkInterface[] interfaces = getNetworkInterfaces(); - private static final ExecutorService executor = Executors.newFixedThreadPool(interfaces.length + 1); + private static final ExecutorService executor = Executors.newFixedThreadPool((interfaces.length + 2) * 2); private static final Charset UTF8 = Charset.forName("UTF-8"); private static final int TTL = Integer.parseInt(System.getProperty("org.apache.openejb.multipulse.ttl", "32")); @@ -82,9 +82,9 @@ public class MulticastPulseAgent impleme private boolean loopbackOnly = true; /** - * This agent listens for a client pulse on a defined multicast channel. + * This agent listens for client pulses on a defined multicast channel. * On receipt of a valid pulse the agent responds with its own pulse for - * a defined amount of time. A client can deliver a pulse as often as + * a defined amount of time and rate. A client can deliver a pulse as often as * required until it is happy of the server response. * <p/> * Both server and client deliver crafted information payloads. @@ -112,7 +112,7 @@ public class MulticastPulseAgent impleme log.warning("Invalid ignore parameter. Should be a lowercase single or comma seperated list like: ignore=host1,host2"); } - this.multicast = p.getProperty("bind", this.multicast); + this.multicast = o.get("bind", this.multicast); this.port = o.get("port", this.port); this.group = o.get("group", this.group); @@ -225,6 +225,9 @@ public class MulticastPulseAgent impleme } final CountDownLatch latch = new CountDownLatch(this.sockets.length); + final String mpg = MulticastPulseAgent.this.group; + final boolean isLoopBackOnly = MulticastPulseAgent.this.loopbackOnly; + final DatagramPacket mpr = MulticastPulseAgent.this.response; for (final MulticastSocket socket : this.sockets) { @@ -243,34 +246,67 @@ public class MulticastPulseAgent impleme if (null != sa) { - String s = new String(request.getData(), 0, request.getLength()); + final String req = new String(request.getData(), 0, request.getLength()); - if (s.startsWith(CLIENT)) { - - s = (s.replace(CLIENT, "")); - - final String client = ((InetSocketAddress) sa).getAddress().getHostAddress(); - - if (MulticastPulseAgent.this.group.equals(s) || "*".equals(s)) { - - if (MulticastPulseAgent.this.loopbackOnly) { - //We only have local services, so make sure the request is from a local source else ignore it - if (!MulticastPulseAgent.isLocalAddress(client, false)) { - log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available", client, s)); - continue; + executor.execute(new Runnable() { + @Override + public void run() { + + String s = req; + + if (s.startsWith(CLIENT)) { + + s = (s.replace(CLIENT, "")); + + if (mpg.equals(s) || "*".equals(s)) { + + final String client = ((InetSocketAddress) sa).getAddress().getHostAddress(); + + if (isLoopBackOnly) { + //We only have local services, so make sure the request is from a local source else ignore it + if (!MulticastPulseAgent.isLocalAddress(client, false)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available", + client, + s)); + } + return; + } + } + + if (log.isDebugEnabled()) { + log.debug(String.format("Answering client %1$s pulse request for group: %2$s", client, s)); + } + + //This is a valid client request for the server to respond on the same channel. + //Because multicast is not guaranteed we will send 3 responses per valid request at 10ms intervals. + for (int i = 0; i < 3; i++) { + + try { + socket.send(mpr); + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("MulticastPulseAgent client error: " + e.getMessage(), e); + } + } + + try { + Thread.sleep(10); + } catch (InterruptedException e) { + break; + } + } } } - - log.debug(String.format("Answering client %1$s pulse request for group: %2$s", client, s)); - socket.send(MulticastPulseAgent.this.response); - } else { - log.debug(String.format("Ignoring client %1$s pulse request for group: %2$s", client, s)); } - } + }); + } - } catch (Throwable e) { - //Ignore + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("MulticastPulseAgent request error: " + e.getMessage(), e); + } } } Modified: tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1530583&r1=1530582&r2=1530583&view=diff ============================================================================== --- tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java (original) +++ tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java Wed Oct 9 12:35:05 2013 @@ -141,7 +141,7 @@ public class MulticastPulseAgentTest { //Compare URI hosts int i = compare(u1.getHost(), u2.getHost()); - if (i == 0) { + if (i != 0) { i = uri1.compareTo(uri2); }