Repository: tomee Updated Branches: refs/heads/develop 68e57d7f3 -> 74faacddc
#TOMEE-1500 - MultiPulse bad URI now fires even if ignored Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/74faacdd Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/74faacdd Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/74faacdd Branch: refs/heads/develop Commit: 74faacddcc1f0c86b3a21e998357ed1197323ed2 Parents: 68e57d7 Author: AndyGee <[email protected]> Authored: Tue Jan 27 18:58:54 2015 +0100 Committer: AndyGee <[email protected]> Committed: Tue Jan 27 18:58:54 2015 +0100 ---------------------------------------------------------------------- .../server/discovery/MulticastPulseAgent.java | 103 ++++++++++--------- .../discovery/MulticastPulseAgentTest.java | 64 ++++++------ 2 files changed, 87 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/74faacdd/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java ---------------------------------------------------------------------- diff --git a/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java b/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java index dda21d2..438cd22 100644 --- a/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java +++ b/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java @@ -75,9 +75,8 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM private final ReentrantLock lock = new ReentrantLock(); private final Set<String> ignore = Collections.synchronizedSet(new HashSet<String>()); private final Set<URI> uriSet = new HashSet<URI>(); - private AtomicBoolean running = new AtomicBoolean(false); + private final AtomicBoolean running = new AtomicBoolean(false); final ArrayList<Future> futures = new ArrayList<Future>(); - final ArrayList<Future> senders = new ArrayList<Future>(); private MulticastSocket[] sockets = null; private InetSocketAddress address = null; @@ -89,7 +88,6 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM private boolean loopbackOnly = true; /** - * @author Andy Gumbrecht * This agent listens for client pulses on a defined multicast channel. * On receipt of a valid pulse the agent responds with its own pulse for * a defined amount of time and rate. A client can deliver a pulse as often as @@ -118,7 +116,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM if (null == executor) { - int length = getNetworkInterfaces().length; + int length = getInterfaces().length; if (length < 1) { length = 1; } @@ -145,7 +143,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM this.ignore.add(s.trim().toLowerCase()); } } - } catch (Exception e) { + } catch (final Exception e) { log.warning("Invalid ignore parameter. Should be a lowercase single host or comma seperated list of hosts to ignore like: ignore=host1,host2,ipv4,ipv6"); } @@ -198,7 +196,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM if (bytes.length > 2048) { log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" + - "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb); + "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb); } } finally { l.unlock(); @@ -263,13 +261,14 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM private void fireEvent(final URI uri, final boolean add) { if (null != this.listener) { + final DiscoveryListener dl = this.listener; getExecutorService().execute(new Runnable() { @Override public void run() { if (add) { - MulticastPulseAgent.this.listener.serviceAdded(uri); + dl.serviceAdded(uri); } else { - MulticastPulseAgent.this.listener.serviceRemoved(uri); + dl.serviceRemoved(uri); } } }); @@ -282,7 +281,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM try { this.sockets = getSockets(this.multicast, this.port); - } catch (Exception e) { + } catch (final Exception e) { throw new ServiceException("Failed to get Multicast sockets", e); } @@ -290,13 +289,14 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final String mpg = this.group; final boolean isLoopBackOnly = this.loopbackOnly; final ExecutorService executorService = getExecutorService(); + final MulticastPulseAgent agent = MulticastPulseAgent.this; for (final MulticastSocket socket : this.sockets) { final String socketKey; try { socketKey = socket.getNetworkInterface().toString(); - } catch (SocketException e) { + } catch (final SocketException e) { log.error("Failed to get network interface name on: " + socket, e); continue; } @@ -311,7 +311,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final DatagramPacket request = new DatagramPacket(new byte[2048], 2048); latch.countDown(); - while (MulticastPulseAgent.this.running.get()) { + while (agent.running.get()) { try { socket.receive(request); @@ -338,24 +338,25 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM if (mpg.equals(req) || "*".equals(req)) { //Is there a bad url and is it this agent broadcasting the bad URI? - if (null != badUri && getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) { - final ReentrantLock l = MulticastPulseAgent.this.lock; - l.lock(); - - try { - //Remove it and rebuild our broadcast packet - if (MulticastPulseAgent.this.ignore.add(badUri)) { - MulticastPulseAgent.this.buildPacket(); - - MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false); - - log.warning("This server has removed the unreachable host '" + badUri + "' from discovery, you should consider adding" + - " this to the 'ignore' property in the multipulse.properties file"); + if (null != badUri) { + if (getHosts(agent.ignore).contains(badUri)) { + final ReentrantLock l = agent.lock; + l.lock(); + + try { + //Remove it and rebuild our broadcast packet + if (agent.ignore.add(badUri)) { + agent.buildPacket(); + log.warning("This server has removed the unreachable host '" + badUri + "' from discovery, you should consider adding" + + " this to the 'ignore' property in the multipulse.properties file"); + } + } finally { + l.unlock(); } - - } finally { - l.unlock(); } + + agent.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false); + } else { //Normal client multicast pulse request @@ -365,8 +366,8 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM //We only have local services, so make sure the request is from a local source else ignore it if (log.isDebugEnabled()) { log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available", - client, - req)); + client, + req)); } } else { @@ -383,7 +384,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM } } - } catch (Exception e) { + } catch (final Exception e) { if (log.isDebugEnabled()) { log.debug("MulticastPulseAgent request error: " + e.getMessage(), e); } @@ -392,7 +393,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM try { socket.close(); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -402,7 +403,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM try { //Give threads a reasonable amount of time to start latch.await(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { this.stop(); } } @@ -417,7 +418,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM for (final Future future : this.futures) { try { future.cancel(true); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -426,7 +427,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM for (final Future future : this.futures) { try { future.get(); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -439,7 +440,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM for (final MulticastSocket s : this.sockets) { try { s.close(); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -511,7 +512,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM try { ia = InetAddress.getByName(multicastAddress); - } catch (UnknownHostException e) { + } catch (final UnknownHostException e) { throw new ServiceException(multicastAddress + " is not a valid address", e); } @@ -526,7 +527,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>(); - for (final NetworkInterface ni : getNetworkInterfaces()) { + for (final NetworkInterface ni : getInterfaces()) { MulticastSocket ms = null; @@ -545,12 +546,12 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM log.debug(String.format("Created MulticastSocket for '%1$s:%2$s' on network adapter: %3$s", ia.getHostName(), port, ni)); - } catch (Throwable e) { + } catch (final Throwable e) { if (null != ms) { try { ms.close(); - } catch (Throwable t) { + } catch (final Throwable t) { //Ignore } } @@ -573,7 +574,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM list.add(next); } } - } catch (SocketException e) { + } catch (final SocketException e) { //Ignore } @@ -591,7 +592,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final InetAddress addr; try { addr = InetAddress.getByName(host); - } catch (UnknownHostException e) { + } catch (final UnknownHostException e) { return false; } @@ -610,7 +611,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final InetAddress addr; try { addr = InetAddress.getByName(host); - } catch (UnknownHostException e) { + } catch (final UnknownHostException e) { return false; } @@ -622,7 +623,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM // Check if the address is defined on any interface try { return NetworkInterface.getByInetAddress(addr) != null; - } catch (SocketException e) { + } catch (final SocketException e) { return false; } } @@ -648,7 +649,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM } else if (0 != h1.compareTo(h2)) { return -1; } - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } @@ -660,7 +661,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final InetAddress localhost = InetAddress.getLocalHost(); hosts.add(localhost.getHostAddress()); //Multi-homed - final InetAddress[] all = InetAddress.getAllByName(localhost.getCanonicalHostName()); + final InetAddress[] all = InetAddress.getAllByName(localhost.getHostName()); for (final InetAddress ip : all) { if (ip.isLinkLocalAddress() || ip.isMulticastAddress()) { @@ -670,10 +671,10 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM final String ha = ip.getHostAddress(); if (!ha.replace("[", "").startsWith("2001:0:")) { //Filter Teredo hosts.add(ha); - hosts.add(ip.getCanonicalHostName()); + hosts.add(ip.getHostName()); } } - } catch (UnknownHostException e) { + } catch (final UnknownHostException e) { log.warning("Failed to list machine hosts", e); } @@ -712,7 +713,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM try { //Wait indefinitely until we are interrupted or notified this.counter.wait(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { if (!this.agent.running.get()) { break; } @@ -724,7 +725,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM try { this.socket.send(this.agent.getResponsePacket()); - } catch (Exception e) { + } catch (final Exception e) { if (log.isDebugEnabled()) { log.debug("MulticastPulseAgent client error: " + e.getMessage(), e); } @@ -732,7 +733,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM try { Thread.sleep(10); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { break; } } @@ -756,4 +757,4 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM return this.socketKey; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tomee/blob/74faacdd/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java ---------------------------------------------------------------------- diff --git a/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java b/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java index a49d5d4..a9d3aa4 100644 --- a/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java +++ b/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java @@ -17,6 +17,7 @@ package org.apache.openejb.server.discovery; import org.apache.openejb.server.DiscoveryListener; +import org.apache.openejb.util.NetworkUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -56,7 +57,7 @@ public class MulticastPulseAgentTest { private static final Charset utf8 = Charset.forName("UTF-8"); private static final String forGroup = "*"; private static final String host = "239.255.3.2"; - private static final int port = 6142; + private static final int port = NetworkUtil.getNextAvailablePort(); private static MulticastPulseAgent agent; @BeforeClass @@ -100,7 +101,7 @@ public class MulticastPulseAgentTest { try { ia = InetAddress.getByName(host); - } catch (UnknownHostException e) { + } catch (final UnknownHostException e) { throw new Exception(host + " is not a valid address", e); } @@ -159,7 +160,7 @@ public class MulticastPulseAgentTest { } else if (0 != h1.compareTo(h2)) { return -1; } - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } @@ -183,7 +184,7 @@ public class MulticastPulseAgentTest { String name = "Unknown interface"; try { name = socket.getNetworkInterface().getDisplayName(); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } System.out.println("Entered MulticastPulse client thread on: " + name); @@ -199,7 +200,7 @@ public class MulticastPulseAgentTest { final SocketAddress sa = response.getSocketAddress(); - if (null != sa && (sa instanceof InetSocketAddress)) { + if ((sa instanceof InetSocketAddress)) { int len = response.getLength(); if (len > 2048) { @@ -225,9 +226,9 @@ public class MulticastPulseAgentTest { final String[] hosts = s.split(","); System.out.println(String.format("\n" + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: %2$s\n\tServer: %3$s\n", - group, - services, - s)); + group, + services, + s)); for (final String svc : serviceList) { @@ -238,7 +239,7 @@ public class MulticastPulseAgentTest { final URI serviceUri; try { serviceUri = URI.create(svc); - } catch (Throwable e) { + } catch (final Throwable e) { continue; } @@ -277,7 +278,7 @@ public class MulticastPulseAgentTest { //Just add as is set.add(URI.create(fullsvc)); } - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } finally { setLock.unlock(); @@ -289,7 +290,7 @@ public class MulticastPulseAgentTest { } } - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -313,7 +314,7 @@ public class MulticastPulseAgentTest { for (final MulticastSocket socket : clientSockets) { try { socket.send(request); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -322,7 +323,7 @@ public class MulticastPulseAgentTest { System.out.println("Giving up on threads"); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { timeout = 1; } @@ -336,7 +337,7 @@ public class MulticastPulseAgentTest { for (final Future future : futures) { try { future.cancel(true); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -345,12 +346,12 @@ public class MulticastPulseAgentTest { try { socket.leaveGroup(ia); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } try { socket.close(); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -361,7 +362,7 @@ public class MulticastPulseAgentTest { for (final Future future : futures) { try { future.get(); - } catch (Throwable e) { + } catch (final Throwable e) { //Ignore } } @@ -416,6 +417,9 @@ public class MulticastPulseAgentTest { final String[] hosts = agent.getHosts().split(","); final String host = hosts[hosts.length - 1]; + boolean removed = agent.removeFromIgnore(host); + org.junit.Assert.assertTrue("Host is already ignored", !removed); + final Future<?> future = executor.submit(new Runnable() { @Override public void run() { @@ -427,30 +431,32 @@ public class MulticastPulseAgentTest { final MulticastSocket[] multicastSockets = MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port); - for (final MulticastSocket socket : multicastSockets) { + for (int i = 0; i < 5; i++) { + for (final MulticastSocket socket : multicastSockets) { - try { - socket.send(request); - } catch (Exception e) { - System.out.println("Failed to broadcast bad URI on: " + socket.getInterface().getHostAddress()); - e.printStackTrace(); + try { + socket.send(request); + Thread.sleep(100); + } catch (final Exception e) { + System.out.println("Failed to broadcast bad URI on: " + socket.getInterface().getHostAddress()); + e.printStackTrace(); + } } } - } catch (Exception e) { + } catch (final Exception e) { System.out.println("Failed to broadcast bad URI"); e.printStackTrace(); } } }); - final Object o = future.get(10, TimeUnit.SECONDS); - final boolean await = latch.await(20, TimeUnit.SECONDS); - final boolean removed = agent.removeFromIgnore(host); + removed = agent.removeFromIgnore(host); agent.setDiscoveryListener(original); - org.junit.Assert.assertTrue("Failed to remove host", removed && await); + org.junit.Assert.assertTrue("Failed to remove host", removed); + org.junit.Assert.assertTrue("Failed to unlatch", await); } private String ipFormat(final String h) throws UnknownHostException { @@ -467,7 +473,7 @@ public class MulticastPulseAgentTest { final InetAddress ia; try { ia = InetAddress.getByName(host); - } catch (UnknownHostException e) { + } catch (final UnknownHostException e) { throw new Exception(host + " is not a valid address", e); }
