Author: andygumbrecht
Date: Wed Jun 13 14:06:22 2012
New Revision: 1349860

URL: http://svn.apache.org/viewvc?rev=1349860&view=rev
Log:
Prefix MultiPulse URIs with server host and group, plus some refactoring of 
MulticastPulseClient.java

Modified:
    
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java

Modified: 
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1349860&r1=1349859&r2=1349860&view=diff
==============================================================================
--- 
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
 (original)
+++ 
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
 Wed Jun 13 14:06:22 2012
@@ -71,7 +71,8 @@ public class MulticastPulseClient extend
             }
 
             try {
-                return 
ConnectionManager.getConnection(URI.create(serviceURI.getSchemeSpecificPart()));
+                //Strip serverhost and group and try to connect
+                return 
ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
             } catch (IOException e) {
                 badUri.add(serviceURI);
             }
@@ -89,40 +90,73 @@ public class MulticastPulseClient extend
 
     /**
      * Get a list of URIs discovered for the provided request.
+     * <p/>
+     * Returned URIs are of the format 
'mp-{serverhost}:group:scheme://servicehost:port'.
+     * The serverhost is prefixed with 'mp-' in case the serverhost is an 
IP-Address, as RFC 2396 defines scheme must begin with a 'letter'
      *
      * @param forGroup Specific case sensitive group name or * for all
      * @param schemes  Acceptable scheme list
      * @param host     Multicast host address
      * @param port     Multicast port
-     * @param timeout  Time to wait for a server response
+     * @param timeout  Time to wait for a server response, at least 50ms
      * @return A URI set, possibly empty
      * @throws Exception On error
      */
-    public static synchronized Set<URI> discoverURIs(final String forGroup, 
final Set<String> schemes, final String host, final int port, final long 
timeout) throws Exception {
+    public static synchronized Set<URI> discoverURIs(final String forGroup, 
final Set<String> schemes, final String host, final int port, long timeout) 
throws Exception {
+
+        if (timeout < 50) {
+            timeout = 50;
+        }
+
+        final InetAddress ia;
+
+        try {
+            ia = InetAddress.getByName(host);
+        } catch (UnknownHostException e) {
+            throw new Exception(host + " is not a valid address", e);
+        }
+
+        if (null == ia || !ia.isMulticastAddress()) {
+            throw new Exception(host + " is not a valid multicast address");
+        }
+
+        final byte[] bytes = (MulticastPulseClient.CLIENT + 
forGroup).getBytes(Charset.forName("utf8"));
+        final DatagramPacket request = new DatagramPacket(bytes, bytes.length, 
new InetSocketAddress(ia, port));
+
+
+        final AtomicBoolean running = new AtomicBoolean(true);
+        final MulticastSocket client = MulticastPulseClient.getSocket(ia, 
port);
+        final Timer timer = new Timer(true);
 
         final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() {
             @Override
             public int compare(URI u1, URI u2) {
 
+                //Ignore server hostname
+                final String serverHost = u1.getScheme();
+                u1 = URI.create(u1.getSchemeSpecificPart());
+                u2 = URI.create(u2.getSchemeSpecificPart());
+
+                //Ignore scheme (ejb,ejbs,etc.)
                 u1 = URI.create(u1.getSchemeSpecificPart());
                 u2 = URI.create(u2.getSchemeSpecificPart());
 
+                if (u1.getHost().equals(serverHost)) {
+                    //If the service host is the same as the server host
+                    //then keep it at the top of the list
+                    return -1;
+                }
+
+                //Compare URI hosts
                 int i = u1.getHost().compareTo(u2.getHost());
 
-                if(i == 0){
+                if (i == 0) {
                     i = u1.compareTo(u2);
                 }
 
                 return i;
             }
         });
-        final byte[] bytes = (MulticastPulseClient.CLIENT + 
forGroup).getBytes(Charset.forName("utf8"));
-        final InetAddress ia = InetAddress.getByName(host);
-        final DatagramPacket request = new DatagramPacket(bytes, bytes.length, 
new InetSocketAddress(ia, port));
-
-        final AtomicBoolean running = new AtomicBoolean(true);
-        final MulticastSocket client = MulticastPulseClient.getSocket(host, 
port);
-        final Timer timer = new Timer(true);
 
         //Start a thread that listens for multicast packets on our channel.
         //This needs to start 'before' we pulse a request.
@@ -139,7 +173,7 @@ public class MulticastPulseClient extend
 
                         final SocketAddress sa = response.getSocketAddress();
 
-                        if (null != sa) {
+                        if (null != sa && (sa instanceof InetSocketAddress)) {
 
                             int len = response.getLength();
                             if (len > 2048) {
@@ -165,7 +199,7 @@ public class MulticastPulseClient extend
                                         continue;
                                     }
 
-                                    URI test = null;
+                                    final URI test;
                                     try {
                                         test = URI.create(svc);
                                     } catch (Throwable e) {
@@ -174,7 +208,11 @@ public class MulticastPulseClient extend
 
                                     if (schemes.contains(test.getScheme())) {
 
-                                        svc = (group + ":" + svc);
+                                        //Just because multicast was received 
on this host is does not mean the service is on the same
+                                        //We can however use this to identify 
an individual machine and group
+                                        final String serverHost = 
((InetSocketAddress) response.getSocketAddress()).getAddress().getHostAddress();
+
+                                        svc = ("mp-" + serverHost + ":" + 
group + ":" + svc);
 
                                         try {
                                             if (svc.contains("0.0.0.0")) {
@@ -206,20 +244,29 @@ public class MulticastPulseClient extend
         t.setDaemon(true);
         t.start();
 
-        if (running.get()) {
-            //Kill the thread after timeout
-            timer.schedule(new TimerTask() {
-                @Override
-                public void run() {
-                    running.set(false);
+        //Kill the thread after timeout
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+
+                running.set(false);
+
+                try {
+                    client.leaveGroup(ia);
+                } catch (Throwable e) {
+                    //Ignore
+                }
+                try {
                     client.close();
-                    t.interrupt();
+                } catch (Throwable e) {
+                    //Ignore
                 }
-            }, timeout);
-        }
+                t.interrupt();
+            }
+        }, timeout);
 
         //Pulse the server
-        final MulticastSocket ms = MulticastPulseClient.getSocket(host, port);
+        final MulticastSocket ms = MulticastPulseClient.getSocket(ia, port);
         ms.send(request);
 
         //Wait for thread to die
@@ -238,19 +285,7 @@ public class MulticastPulseClient extend
         }
     }
 
-    public static MulticastSocket getSocket(final String multicastAddress, 
final int port) throws Exception {
-
-        final InetAddress ia;
-
-        try {
-            ia = InetAddress.getByName(multicastAddress);
-        } catch (UnknownHostException e) {
-            throw new Exception(multicastAddress + " is not a valid address", 
e);
-        }
-
-        if (null == ia || !ia.isMulticastAddress()) {
-            throw new Exception(multicastAddress + " is not a valid multicast 
address");
-        }
+    public static MulticastSocket getSocket(final InetAddress ia, final int 
port) throws Exception {
 
         MulticastSocket ms = null;
 
@@ -259,6 +294,9 @@ public class MulticastPulseClient extend
             final NetworkInterface ni = 
NetworkInterface.getByInetAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()));
             ms.setNetworkInterface(ni);
             ms.setSoTimeout(0);
+            if (!ms.getBroadcast()) {
+                ms.setBroadcast(true);
+            }
             ms.joinGroup(ia);
 
         } catch (Throwable e) {

Modified: 
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1349860&r1=1349859&r2=1349860&view=diff
==============================================================================
--- 
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
 (original)
+++ 
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
 Wed Jun 13 14:06:22 2012
@@ -79,7 +79,6 @@ public class MulticastPulseAgent impleme
         this.port = o.get("port", this.port);
         this.group = o.get("group", this.group);
 
-
         final InetAddress ia = InetAddress.getByName(this.multicast);
         this.address = new InetSocketAddress(ia, this.port);
         this.buildPacket();
@@ -297,6 +296,9 @@ public class MulticastPulseAgent impleme
             final NetworkInterface ni = 
NetworkInterface.getByInetAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()));
             ms.setNetworkInterface(ni);
             ms.setSoTimeout(0);
+            if (!ms.getBroadcast()) {
+                ms.setBroadcast(true);
+            }
             ms.joinGroup(ia);
 
             log.debug(String.format("Created MulticastSocket for '%1$s:%2$s' 
on network adapter: %3$s", multicastAddress, port, ni));


Reply via email to