Author: andygumbrecht
Date: Wed Oct  9 12:35:05 2013
New Revision: 1530583

URL: http://svn.apache.org/r1530583
Log:
Fix https://issues.apache.org/jira/browse/OPENEJB-2043

Modified:
    
tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    
tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
    
tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java

Modified: 
tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- 
tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
 (original)
+++ 
tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
 Wed Oct  9 12:35:05 2013
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
@@ -66,8 +67,24 @@ public class MulticastPulseClient extend
     private static final int TTL = 
Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, "32"));
     private static final int LIMIT = 
Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT, 
"50000"));
     private static final Map<URI, Set<URI>> knownUris = new HashMap<URI, 
Set<URI>>();
-    private static final NetworkInterface[] interfaces = 
getNetworkInterfaces();
-    private static final ExecutorService executor = 
Executors.newFixedThreadPool(interfaces.length + 1);
+    private static NetworkInterface[] interfaces = getNetworkInterfaces();
+    private static ExecutorService executor = null;
+
+    private static synchronized NetworkInterface[] getInterfaces() {
+        if (null == interfaces) {
+            interfaces = getNetworkInterfaces();
+        }
+
+        return interfaces;
+    }
+
+    private static synchronized ExecutorService getExecutorService() {
+        if (null == executor) {
+            executor = Executors.newFixedThreadPool(getInterfaces().length + 
2);
+        }
+
+        return executor;
+    }
 
     /**
      * @param uri Connection URI
@@ -117,7 +134,7 @@ public class MulticastPulseClient extend
             try {
                 //Strip serverhost and group and try to connect
                 return 
ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
-            } catch (Throwable e) {
+            } catch (Exception e) {
                 uriSet.remove(serviceURI);
             }
         }
@@ -200,7 +217,7 @@ public class MulticastPulseClient extend
 
                     //Compare URI hosts
                     int i = compare(u1.getHost(), u2.getHost());
-                    if (i == 0) {
+                    if (i != 0) {
                         i = uri1.compareTo(uri2);
                     }
 
@@ -223,7 +240,7 @@ public class MulticastPulseClient extend
                         } else if (0 != h1.compareTo(h2)) {
                             return -1;
                         }
-                    } catch (Throwable e) {
+                    } catch (Exception e) {
                         //Ignore
                     }
 
@@ -240,7 +257,7 @@ public class MulticastPulseClient extend
 
             for (final MulticastSocket socket : clientSocketsFinal) {
 
-                futures.add(executor.submit(new Runnable() {
+                futures.add(getExecutorService().submit(new Runnable() {
                     @Override
                     public void run() {
                         try {
@@ -258,6 +275,10 @@ public class MulticastPulseClient extend
 
                                         int len = response.getLength();
                                         if (len > 2048) {
+
+                                            if (log.isLoggable(Level.FINE)) {
+                                                log.log(Level.FINE, 
"Truncating multipulse length {0} to 2048", new Object[]{len});
+                                            }
                                             len = 2048;
                                         }
 
@@ -288,7 +309,7 @@ public class MulticastPulseClient extend
                                                 final URI serviceUri;
                                                 try {
                                                     serviceUri = 
URI.create(svc);
-                                                } catch (Throwable e) {
+                                                } catch (Exception e) {
                                                     continue;
                                                 }
 
@@ -327,7 +348,7 @@ public class MulticastPulseClient extend
                                                             //Just add as is
                                                             
set.add(URI.create(svcfull));
                                                         }
-                                                    } catch (Throwable e) {
+                                                    } catch (Exception e) {
                                                         //Ignore
                                                     } finally {
                                                         setLock.unlock();
@@ -337,19 +358,19 @@ public class MulticastPulseClient extend
                                         }
                                     }
 
-                                } catch (Throwable e) {
+                                } catch (Exception e) {
                                     //Ignore
                                 }
                             }
                         } finally {
                             try {
                                 socket.leaveGroup(ia);
-                            } catch (Throwable e) {
+                            } catch (Exception e) {
                                 //Ignore
                             }
                             try {
                                 socket.close();
-                            } catch (Throwable e) {
+                            } catch (Exception e) {
                                 //Ignore
                             }
                         }
@@ -361,8 +382,9 @@ public class MulticastPulseClient extend
                 //Give listener threads a reasonable amount of time to start
                 if (latchListeners.await(5, TimeUnit.SECONDS)) {
 
-                    //Start pulsing request every 20ms - This will ensure we 
have at least 2 pulses within our minimum timeout
-                    futures.add(0, executor.submit(new Runnable() {
+                    //Start pulsing client request every 10ms - This will 
ensure we have at least 4 client pulses within our minimum timeout
+                    //This pulse is designed to tell a listening server to 
wake up and pulse back a response
+                    futures.add(0, getExecutorService().submit(new Runnable() {
                         @Override
                         public void run() {
                             while (running.get()) {
@@ -372,7 +394,7 @@ public class MulticastPulseClient extend
                                     if (running.get()) {
                                         try {
                                             socket.send(request);
-                                        } catch (Throwable e) {
+                                        } catch (Exception e) {
                                             //Ignore
                                         }
                                     } else {
@@ -382,7 +404,7 @@ public class MulticastPulseClient extend
 
                                 if (running.get()) {
                                     try {
-                                        Thread.sleep(20);
+                                        Thread.sleep(10);
                                     } catch (InterruptedException e) {
                                         break;
                                     }
@@ -414,12 +436,12 @@ public class MulticastPulseClient extend
 
                         try {
                             socket.leaveGroup(ia);
-                        } catch (Throwable e) {
+                        } catch (Exception e) {
                             //Ignore
                         }
                         try {
                             socket.close();
-                        } catch (Throwable e) {
+                        } catch (Exception e) {
                             //Ignore
                         }
                     }
@@ -430,7 +452,7 @@ public class MulticastPulseClient extend
             for (final Future future : futures) {
                 try {
                     future.get();
-                } catch (Throwable e) {
+                } catch (Exception e) {
                     //Ignore
                 }
             }
@@ -446,12 +468,12 @@ public class MulticastPulseClient extend
 
                 try {
                     socket.leaveGroup(ia);
-                } catch (Throwable e) {
+                } catch (Exception e) {
                     //Ignore
                 }
                 try {
                     socket.close();
-                } catch (Throwable e) {
+                } catch (Exception e) {
                     //Ignore
                 }
             }
@@ -501,7 +523,7 @@ public class MulticastPulseClient extend
 
         final ArrayList<MulticastSocket> list = new 
ArrayList<MulticastSocket>();
 
-        for (final NetworkInterface ni : interfaces) {
+        for (final NetworkInterface ni : getInterfaces()) {
 
             MulticastSocket ms = null;
 
@@ -518,12 +540,12 @@ public class MulticastPulseClient extend
 
                 list.add(ms);
 
-            } catch (Throwable e) {
+            } catch (Exception e) {
 
                 if (null != ms) {
                     try {
                         ms.close();
-                    } catch (Throwable t) {
+                    } catch (Exception t) {
                         //Ignore
                     }
                 }
@@ -614,11 +636,14 @@ public class MulticastPulseClient extend
                     Set<URI> uriSet = null;
                     try {
                         uriSet = MulticastPulseClient.discoverURIs(discover, 
new HashSet<String>(Arrays.asList("ejbd", "ejbds", "http", "https")), mchost, 
mcport, timeout);
-                    } catch (Throwable e) {
+                    } catch (Exception e) {
                         System.err.println(e.getMessage());
                     }
 
-                    if (uriSet != null && uriSet.size() > 0) {
+                    final int size = uriSet.size();
+                    if (uriSet != null && size > 0) {
+
+                        final int st = (timeout / size);
 
                         for (final URI uri : uriSet) {
 
@@ -636,22 +661,24 @@ public class MulticastPulseClient extend
                                 continue;
                             }
 
+                            System.out.print(server + ":" + group + " - " + 
uriSub.toASCIIString() + " is reachable: ");
+
                             boolean b = false;
                             final Socket s = new Socket();
                             try {
-                                s.connect(new InetSocketAddress(host, port), 
500);
+                                s.connect(new InetSocketAddress(host, port), 
st);
                                 b = true;
-                            } catch (Throwable e) {
+                            } catch (Exception e) {
                                 //Ignore
                             } finally {
                                 try {
                                     s.close();
-                                } catch (Throwable e) {
+                                } catch (Exception e) {
                                     //Ignore
                                 }
                             }
 
-                            System.out.println(server + ":" + group + " - " + 
uriSub.toASCIIString() + " is reachable: " + b);
+                            System.out.println(b);
                         }
                     } else {
                         System.out.println("### Failed to discover server: " + 
discover);

Modified: 
tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- 
tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
 (original)
+++ 
tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
 Wed Oct  9 12:35:05 2013
@@ -59,7 +59,7 @@ public class MulticastPulseAgent impleme
 
     private static final Logger log = 
Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery").createChild("multipulse"),
 MulticastPulseAgent.class);
     private static final NetworkInterface[] interfaces = 
getNetworkInterfaces();
-    private static final ExecutorService executor = 
Executors.newFixedThreadPool(interfaces.length + 1);
+    private static final ExecutorService executor = 
Executors.newFixedThreadPool((interfaces.length + 2) * 2);
     private static final Charset UTF8 = Charset.forName("UTF-8");
     private static final int TTL = 
Integer.parseInt(System.getProperty("org.apache.openejb.multipulse.ttl", "32"));
 
@@ -82,9 +82,9 @@ public class MulticastPulseAgent impleme
     private boolean loopbackOnly = true;
 
     /**
-     * This agent listens for a client pulse on a defined multicast channel.
+     * This agent listens for client pulses on a defined multicast channel.
      * On receipt of a valid pulse the agent responds with its own pulse for
-     * a defined amount of time. A client can deliver a pulse as often as
+     * a defined amount of time and rate. A client can deliver a pulse as 
often as
      * required until it is happy of the server response.
      * <p/>
      * Both server and client deliver crafted information payloads.
@@ -112,7 +112,7 @@ public class MulticastPulseAgent impleme
             log.warning("Invalid ignore parameter. Should be a lowercase 
single or comma seperated list like: ignore=host1,host2");
         }
 
-        this.multicast = p.getProperty("bind", this.multicast);
+        this.multicast = o.get("bind", this.multicast);
         this.port = o.get("port", this.port);
         this.group = o.get("group", this.group);
 
@@ -225,6 +225,9 @@ public class MulticastPulseAgent impleme
             }
 
             final CountDownLatch latch = new 
CountDownLatch(this.sockets.length);
+            final String mpg = MulticastPulseAgent.this.group;
+            final boolean isLoopBackOnly = 
MulticastPulseAgent.this.loopbackOnly;
+            final DatagramPacket mpr = MulticastPulseAgent.this.response;
 
             for (final MulticastSocket socket : this.sockets) {
 
@@ -243,34 +246,67 @@ public class MulticastPulseAgent impleme
 
                                 if (null != sa) {
 
-                                    String s = new String(request.getData(), 
0, request.getLength());
+                                    final String req = new 
String(request.getData(), 0, request.getLength());
 
-                                    if (s.startsWith(CLIENT)) {
-
-                                        s = (s.replace(CLIENT, ""));
-
-                                        final String client = 
((InetSocketAddress) sa).getAddress().getHostAddress();
-
-                                        if 
(MulticastPulseAgent.this.group.equals(s) || "*".equals(s)) {
-
-                                            if 
(MulticastPulseAgent.this.loopbackOnly) {
-                                                //We only have local services, 
so make sure the request is from a local source else ignore it
-                                                if 
(!MulticastPulseAgent.isLocalAddress(client, false)) {
-                                                    
log.debug(String.format("Ignoring remote client %1$s pulse request for group: 
%2$s - No remote services available", client, s));
-                                                    continue;
+                                    executor.execute(new Runnable() {
+                                        @Override
+                                        public void run() {
+
+                                            String s = req;
+
+                                            if (s.startsWith(CLIENT)) {
+
+                                                s = (s.replace(CLIENT, ""));
+
+                                                if (mpg.equals(s) || 
"*".equals(s)) {
+
+                                                    final String client = 
((InetSocketAddress) sa).getAddress().getHostAddress();
+
+                                                    if (isLoopBackOnly) {
+                                                        //We only have local 
services, so make sure the request is from a local source else ignore it
+                                                        if 
(!MulticastPulseAgent.isLocalAddress(client, false)) {
+                                                            if 
(log.isDebugEnabled()) {
+                                                                
log.debug(String.format("Ignoring remote client %1$s pulse request for group: 
%2$s - No remote services available",
+                                                                               
         client,
+                                                                               
         s));
+                                                            }
+                                                            return;
+                                                        }
+                                                    }
+
+                                                    if (log.isDebugEnabled()) {
+                                                        
log.debug(String.format("Answering client %1$s pulse request for group: %2$s", 
client, s));
+                                                    }
+
+                                                    //This is a valid client 
request for the server to respond on the same channel.
+                                                    //Because multicast is not 
guaranteed we will send 3 responses per valid request at 10ms intervals.
+                                                    for (int i = 0; i < 3; 
i++) {
+
+                                                        try {
+                                                            socket.send(mpr);
+                                                        } catch (Exception e) {
+                                                            if 
(log.isDebugEnabled()) {
+                                                                
log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);
+                                                            }
+                                                        }
+
+                                                        try {
+                                                            Thread.sleep(10);
+                                                        } catch 
(InterruptedException e) {
+                                                            break;
+                                                        }
+                                                    }
                                                 }
                                             }
-
-                                            log.debug(String.format("Answering 
client %1$s pulse request for group: %2$s", client, s));
-                                            
socket.send(MulticastPulseAgent.this.response);
-                                        } else {
-                                            log.debug(String.format("Ignoring 
client %1$s pulse request for group: %2$s", client, s));
                                         }
-                                    }
+                                    });
+
                                 }
 
-                            } catch (Throwable e) {
-                                //Ignore
+                            } catch (Exception e) {
+                                if (log.isDebugEnabled()) {
+                                    log.debug("MulticastPulseAgent request 
error: " + e.getMessage(), e);
+                                }
                             }
                         }
 

Modified: 
tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL: 
http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- 
tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
 (original)
+++ 
tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
 Wed Oct  9 12:35:05 2013
@@ -141,7 +141,7 @@ public class MulticastPulseAgentTest {
 
                 //Compare URI hosts
                 int i = compare(u1.getHost(), u2.getHost());
-                if (i == 0) {
+                if (i != 0) {
                     i = uri1.compareTo(uri2);
                 }
 


Reply via email to