Repository: tomee Updated Branches: refs/heads/tomee-1.7.x efbb99651 -> 2e2031eba
#TOMEE-1500 - MultiPulse bad URI now fires even if ignored Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/2e2031eb Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/2e2031eb Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/2e2031eb Branch: refs/heads/tomee-1.7.x Commit: 2e2031eba781357b48f7f5cc6ff3e14a51353f79 Parents: efbb996 Author: andygumbrecht <[email protected]> Authored: Tue Jan 27 18:18:33 2015 +0100 Committer: andygumbrecht <[email protected]> Committed: Tue Jan 27 18:18:33 2015 +0100 ---------------------------------------------------------------------- .../server/discovery/MulticastPulseAgent.java | 55 ++++++++++---------- .../discovery/MulticastPulseAgentTest.java | 33 +++++++----- 2 files changed, 47 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/2e2031eb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java ---------------------------------------------------------------------- diff --git a/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java b/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java index 04dc79c..eb91edd 100644 --- a/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java +++ b/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java @@ -75,9 +75,8 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM private final ReentrantLock lock = new ReentrantLock(); private final Set<String> ignore = Collections.synchronizedSet(new HashSet<String>()); private final Set<URI> uriSet = new HashSet<URI>(); - private AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean running = new AtomicBoolean(false); final ArrayList<Future> futures = new ArrayList<Future>(); - final ArrayList<Future> senders = new ArrayList<Future>(); private MulticastSocket[] sockets = null; private InetSocketAddress address = null; @@ -89,7 +88,6 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM private boolean loopbackOnly = true; /** - * @author Andy Gumbrecht * This agent listens for client pulses on a defined multicast channel. * On receipt of a valid pulse the agent responds with its own pulse for * a defined amount of time and rate. A client can deliver a pulse as often as @@ -198,7 +196,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM if (bytes.length > 2048) { log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" + - "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb); + "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb); } } finally { l.unlock(); @@ -263,13 +261,14 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM private void fireEvent(final URI uri, final boolean add) { if (null != this.listener) { + final DiscoveryListener dl = this.listener; getExecutorService().execute(new Runnable() { @Override public void run() { if (add) { - MulticastPulseAgent.this.listener.serviceAdded(uri); + dl.serviceAdded(uri); } else { - MulticastPulseAgent.this.listener.serviceRemoved(uri); + dl.serviceRemoved(uri); } } }); @@ -290,6 +289,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final String mpg = this.group; final boolean isLoopBackOnly = this.loopbackOnly; final ExecutorService executorService = getExecutorService(); + final MulticastPulseAgent agent = MulticastPulseAgent.this; for (final MulticastSocket socket : this.sockets) { @@ -311,7 +311,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final DatagramPacket request = new DatagramPacket(new byte[2048], 2048); latch.countDown(); - while (MulticastPulseAgent.this.running.get()) { + while (agent.running.get()) { try { socket.receive(request); @@ -338,24 +338,25 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM if (mpg.equals(req) || "*".equals(req)) { //Is there a bad url and is it this agent broadcasting the bad URI? - if (null != badUri && getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) { - final ReentrantLock l = MulticastPulseAgent.this.lock; - l.lock(); - - try { - //Remove it and rebuild our broadcast packet - if (MulticastPulseAgent.this.ignore.add(badUri)) { - MulticastPulseAgent.this.buildPacket(); - - MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false); - - log.warning("This server has removed the unreachable host '" + badUri + "' from discovery, you should consider adding" + - " this to the 'ignore' property in the multipulse.properties file"); + if (null != badUri) { + if (getHosts(agent.ignore).contains(badUri)) { + final ReentrantLock l = agent.lock; + l.lock(); + + try { + //Remove it and rebuild our broadcast packet + if (agent.ignore.add(badUri)) { + agent.buildPacket(); + log.warning("This server has removed the unreachable host '" + badUri + "' from discovery, you should consider adding" + + " this to the 'ignore' property in the multipulse.properties file"); + } + } finally { + l.unlock(); } - - } finally { - l.unlock(); } + + agent.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false); + } else { //Normal client multicast pulse request @@ -365,8 +366,8 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM //We only have local services, so make sure the request is from a local source else ignore it if (log.isDebugEnabled()) { log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available", - client, - req)); + client, + req)); } } else { @@ -660,7 +661,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final InetAddress localhost = InetAddress.getLocalHost(); hosts.add(localhost.getHostAddress()); //Multi-homed - final InetAddress[] all = InetAddress.getAllByName(localhost.getCanonicalHostName()); + final InetAddress[] all = InetAddress.getAllByName(localhost.getHostName()); for (final InetAddress ip : all) { if (ip.isLinkLocalAddress() || ip.isMulticastAddress()) { @@ -670,7 +671,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final String ha = ip.getHostAddress(); if (!ha.replace("[", "").startsWith("2001:0:")) { //Filter Teredo hosts.add(ha); - hosts.add(ip.getCanonicalHostName()); + hosts.add(ip.getHostName()); } } } catch (final UnknownHostException e) { http://git-wip-us.apache.org/repos/asf/tomee/blob/2e2031eb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java ---------------------------------------------------------------------- diff --git a/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java b/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java index f1567f4..a9d3aa4 100644 --- a/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java +++ b/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java @@ -200,7 +200,7 @@ public class MulticastPulseAgentTest { final SocketAddress sa = response.getSocketAddress(); - if (null != sa && (sa instanceof InetSocketAddress)) { + if ((sa instanceof InetSocketAddress)) { int len = response.getLength(); if (len > 2048) { @@ -226,9 +226,9 @@ public class MulticastPulseAgentTest { final String[] hosts = s.split(","); System.out.println(String.format("\n" + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: %2$s\n\tServer: %3$s\n", - group, - services, - s)); + group, + services, + s)); for (final String svc : serviceList) { @@ -417,6 +417,9 @@ public class MulticastPulseAgentTest { final String[] hosts = agent.getHosts().split(","); final String host = hosts[hosts.length - 1]; + boolean removed = agent.removeFromIgnore(host); + org.junit.Assert.assertTrue("Host is already ignored", !removed); + final Future<?> future = executor.submit(new Runnable() { @Override public void run() { @@ -428,13 +431,16 @@ public class MulticastPulseAgentTest { final MulticastSocket[] multicastSockets = MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port); - for (final MulticastSocket socket : multicastSockets) { + for (int i = 0; i < 5; i++) { + for (final MulticastSocket socket : multicastSockets) { - try { - socket.send(request); - } catch (final Exception e) { - System.out.println("Failed to broadcast bad URI on: " + socket.getInterface().getHostAddress()); - e.printStackTrace(); + try { + socket.send(request); + Thread.sleep(100); + } catch (final Exception e) { + System.out.println("Failed to broadcast bad URI on: " + socket.getInterface().getHostAddress()); + e.printStackTrace(); + } } } } catch (final Exception e) { @@ -444,14 +450,13 @@ public class MulticastPulseAgentTest { } }); - final Object o = future.get(10, TimeUnit.SECONDS); - final boolean await = latch.await(20, TimeUnit.SECONDS); - final boolean removed = agent.removeFromIgnore(host); + removed = agent.removeFromIgnore(host); agent.setDiscoveryListener(original); - org.junit.Assert.assertTrue("Failed to remove host", removed && await); + org.junit.Assert.assertTrue("Failed to remove host", removed); + org.junit.Assert.assertTrue("Failed to unlatch", await); } private String ipFormat(final String h) throws UnknownHostException {
