Author: andygumbrecht
Date: Wed Jun 13 14:06:22 2012
New Revision: 1349860
URL: http://svn.apache.org/viewvc?rev=1349860&view=rev
Log:
Prefix MultiPulse URIs with server host and group, plus some refactoring of
MulticastPulseClient.java
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
Modified:
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1349860&r1=1349859&r2=1349860&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
(original)
+++
openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
Wed Jun 13 14:06:22 2012
@@ -71,7 +71,8 @@ public class MulticastPulseClient extend
}
try {
- return
ConnectionManager.getConnection(URI.create(serviceURI.getSchemeSpecificPart()));
+ //Strip serverhost and group and try to connect
+ return
ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
} catch (IOException e) {
badUri.add(serviceURI);
}
@@ -89,40 +90,73 @@ public class MulticastPulseClient extend
/**
* Get a list of URIs discovered for the provided request.
+ * <p/>
+ * Returned URIs are of the format
'mp-{serverhost}:group:scheme://servicehost:port'.
+ * The serverhost is prefixed with 'mp-' in case the serverhost is an
IP-Address, as RFC 2396 defines scheme must begin with a 'letter'
*
* @param forGroup Specific case sensitive group name or * for all
* @param schemes Acceptable scheme list
* @param host Multicast host address
* @param port Multicast port
- * @param timeout Time to wait for a server response
+ * @param timeout Time to wait for a server response, at least 50ms
* @return A URI set, possibly empty
* @throws Exception On error
*/
- public static synchronized Set<URI> discoverURIs(final String forGroup,
final Set<String> schemes, final String host, final int port, final long
timeout) throws Exception {
+ public static synchronized Set<URI> discoverURIs(final String forGroup,
final Set<String> schemes, final String host, final int port, long timeout)
throws Exception {
+
+ if (timeout < 50) {
+ timeout = 50;
+ }
+
+ final InetAddress ia;
+
+ try {
+ ia = InetAddress.getByName(host);
+ } catch (UnknownHostException e) {
+ throw new Exception(host + " is not a valid address", e);
+ }
+
+ if (null == ia || !ia.isMulticastAddress()) {
+ throw new Exception(host + " is not a valid multicast address");
+ }
+
+ final byte[] bytes = (MulticastPulseClient.CLIENT +
forGroup).getBytes(Charset.forName("utf8"));
+ final DatagramPacket request = new DatagramPacket(bytes, bytes.length,
new InetSocketAddress(ia, port));
+
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final MulticastSocket client = MulticastPulseClient.getSocket(ia,
port);
+ final Timer timer = new Timer(true);
final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() {
@Override
public int compare(URI u1, URI u2) {
+ //Ignore server hostname
+ final String serverHost = u1.getScheme();
+ u1 = URI.create(u1.getSchemeSpecificPart());
+ u2 = URI.create(u2.getSchemeSpecificPart());
+
+ //Ignore scheme (ejb,ejbs,etc.)
u1 = URI.create(u1.getSchemeSpecificPart());
u2 = URI.create(u2.getSchemeSpecificPart());
+ if (u1.getHost().equals(serverHost)) {
+ //If the service host is the same as the server host
+ //then keep it at the top of the list
+ return -1;
+ }
+
+ //Compare URI hosts
int i = u1.getHost().compareTo(u2.getHost());
- if(i == 0){
+ if (i == 0) {
i = u1.compareTo(u2);
}
return i;
}
});
- final byte[] bytes = (MulticastPulseClient.CLIENT +
forGroup).getBytes(Charset.forName("utf8"));
- final InetAddress ia = InetAddress.getByName(host);
- final DatagramPacket request = new DatagramPacket(bytes, bytes.length,
new InetSocketAddress(ia, port));
-
- final AtomicBoolean running = new AtomicBoolean(true);
- final MulticastSocket client = MulticastPulseClient.getSocket(host,
port);
- final Timer timer = new Timer(true);
//Start a thread that listens for multicast packets on our channel.
//This needs to start 'before' we pulse a request.
@@ -139,7 +173,7 @@ public class MulticastPulseClient extend
final SocketAddress sa = response.getSocketAddress();
- if (null != sa) {
+ if (null != sa && (sa instanceof InetSocketAddress)) {
int len = response.getLength();
if (len > 2048) {
@@ -165,7 +199,7 @@ public class MulticastPulseClient extend
continue;
}
- URI test = null;
+ final URI test;
try {
test = URI.create(svc);
} catch (Throwable e) {
@@ -174,7 +208,11 @@ public class MulticastPulseClient extend
if (schemes.contains(test.getScheme())) {
- svc = (group + ":" + svc);
+ //Just because multicast was received
on this host is does not mean the service is on the same
+ //We can however use this to identify
an individual machine and group
+ final String serverHost =
((InetSocketAddress) response.getSocketAddress()).getAddress().getHostAddress();
+
+ svc = ("mp-" + serverHost + ":" +
group + ":" + svc);
try {
if (svc.contains("0.0.0.0")) {
@@ -206,20 +244,29 @@ public class MulticastPulseClient extend
t.setDaemon(true);
t.start();
- if (running.get()) {
- //Kill the thread after timeout
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- running.set(false);
+ //Kill the thread after timeout
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+
+ running.set(false);
+
+ try {
+ client.leaveGroup(ia);
+ } catch (Throwable e) {
+ //Ignore
+ }
+ try {
client.close();
- t.interrupt();
+ } catch (Throwable e) {
+ //Ignore
}
- }, timeout);
- }
+ t.interrupt();
+ }
+ }, timeout);
//Pulse the server
- final MulticastSocket ms = MulticastPulseClient.getSocket(host, port);
+ final MulticastSocket ms = MulticastPulseClient.getSocket(ia, port);
ms.send(request);
//Wait for thread to die
@@ -238,19 +285,7 @@ public class MulticastPulseClient extend
}
}
- public static MulticastSocket getSocket(final String multicastAddress,
final int port) throws Exception {
-
- final InetAddress ia;
-
- try {
- ia = InetAddress.getByName(multicastAddress);
- } catch (UnknownHostException e) {
- throw new Exception(multicastAddress + " is not a valid address",
e);
- }
-
- if (null == ia || !ia.isMulticastAddress()) {
- throw new Exception(multicastAddress + " is not a valid multicast
address");
- }
+ public static MulticastSocket getSocket(final InetAddress ia, final int
port) throws Exception {
MulticastSocket ms = null;
@@ -259,6 +294,9 @@ public class MulticastPulseClient extend
final NetworkInterface ni =
NetworkInterface.getByInetAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()));
ms.setNetworkInterface(ni);
ms.setSoTimeout(0);
+ if (!ms.getBroadcast()) {
+ ms.setBroadcast(true);
+ }
ms.joinGroup(ia);
} catch (Throwable e) {
Modified:
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1349860&r1=1349859&r2=1349860&view=diff
==============================================================================
---
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
(original)
+++
openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
Wed Jun 13 14:06:22 2012
@@ -79,7 +79,6 @@ public class MulticastPulseAgent impleme
this.port = o.get("port", this.port);
this.group = o.get("group", this.group);
-
final InetAddress ia = InetAddress.getByName(this.multicast);
this.address = new InetSocketAddress(ia, this.port);
this.buildPacket();
@@ -297,6 +296,9 @@ public class MulticastPulseAgent impleme
final NetworkInterface ni =
NetworkInterface.getByInetAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()));
ms.setNetworkInterface(ni);
ms.setSoTimeout(0);
+ if (!ms.getBroadcast()) {
+ ms.setBroadcast(true);
+ }
ms.joinGroup(ia);
log.debug(String.format("Created MulticastSocket for '%1$s:%2$s'
on network adapter: %3$s", multicastAddress, port, ni));