Repository: tomee
Updated Branches:
  refs/heads/develop 68e57d7f3 -> 74faacddc


#TOMEE-1500 - MultiPulse bad URI now fires even if ignored


Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/74faacdd
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/74faacdd
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/74faacdd

Branch: refs/heads/develop
Commit: 74faacddcc1f0c86b3a21e998357ed1197323ed2
Parents: 68e57d7
Author: AndyGee <[email protected]>
Authored: Tue Jan 27 18:58:54 2015 +0100
Committer: AndyGee <[email protected]>
Committed: Tue Jan 27 18:58:54 2015 +0100

----------------------------------------------------------------------
 .../server/discovery/MulticastPulseAgent.java   | 103 ++++++++++---------
 .../discovery/MulticastPulseAgentTest.java      |  64 ++++++------
 2 files changed, 87 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tomee/blob/74faacdd/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
----------------------------------------------------------------------
diff --git 
a/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
 
b/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
index dda21d2..438cd22 100644
--- 
a/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
+++ 
b/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
@@ -75,9 +75,8 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
     private final ReentrantLock lock = new ReentrantLock();
     private final Set<String> ignore = Collections.synchronizedSet(new 
HashSet<String>());
     private final Set<URI> uriSet = new HashSet<URI>();
-    private AtomicBoolean running = new AtomicBoolean(false);
+    private final AtomicBoolean running = new AtomicBoolean(false);
     final ArrayList<Future> futures = new ArrayList<Future>();
-    final ArrayList<Future> senders = new ArrayList<Future>();
     private MulticastSocket[] sockets = null;
     private InetSocketAddress address = null;
 
@@ -89,7 +88,6 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
     private boolean loopbackOnly = true;
 
     /**
-     * @author Andy Gumbrecht
      * This agent listens for client pulses on a defined multicast channel.
      * On receipt of a valid pulse the agent responds with its own pulse for
      * a defined amount of time and rate. A client can deliver a pulse as 
often as
@@ -118,7 +116,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
 
         if (null == executor) {
 
-            int length = getNetworkInterfaces().length;
+            int length = getInterfaces().length;
             if (length < 1) {
                 length = 1;
             }
@@ -145,7 +143,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                     this.ignore.add(s.trim().toLowerCase());
                 }
             }
-        } catch (Exception e) {
+        } catch (final Exception e) {
             log.warning("Invalid ignore parameter. Should be a lowercase 
single host or comma seperated list of hosts to ignore like: 
ignore=host1,host2,ipv4,ipv6");
         }
 
@@ -198,7 +196,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
 
             if (bytes.length > 2048) {
                 log.warning("MultiPulse packet is larger than 2048 bytes, 
clients will not be able to read the packet" +
-                    "\n - You should define the 'ignore' property to filter 
out unreachable addresses: " + sb);
+                        "\n - You should define the 'ignore' property to 
filter out unreachable addresses: " + sb);
             }
         } finally {
             l.unlock();
@@ -263,13 +261,14 @@ public class MulticastPulseAgent implements 
DiscoveryAgent, ServerService, SelfM
 
     private void fireEvent(final URI uri, final boolean add) {
         if (null != this.listener) {
+            final DiscoveryListener dl = this.listener;
             getExecutorService().execute(new Runnable() {
                 @Override
                 public void run() {
                     if (add) {
-                        MulticastPulseAgent.this.listener.serviceAdded(uri);
+                        dl.serviceAdded(uri);
                     } else {
-                        MulticastPulseAgent.this.listener.serviceRemoved(uri);
+                        dl.serviceRemoved(uri);
                     }
                 }
             });
@@ -282,7 +281,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
 
             try {
                 this.sockets = getSockets(this.multicast, this.port);
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 throw new ServiceException("Failed to get Multicast sockets", 
e);
             }
 
@@ -290,13 +289,14 @@ public class MulticastPulseAgent implements 
DiscoveryAgent, ServerService, SelfM
             final String mpg = this.group;
             final boolean isLoopBackOnly = this.loopbackOnly;
             final ExecutorService executorService = getExecutorService();
+            final MulticastPulseAgent agent = MulticastPulseAgent.this;
 
             for (final MulticastSocket socket : this.sockets) {
 
                 final String socketKey;
                 try {
                     socketKey = socket.getNetworkInterface().toString();
-                } catch (SocketException e) {
+                } catch (final SocketException e) {
                     log.error("Failed to get network interface name on: " + 
socket, e);
                     continue;
                 }
@@ -311,7 +311,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                         final DatagramPacket request = new DatagramPacket(new 
byte[2048], 2048);
                         latch.countDown();
 
-                        while (MulticastPulseAgent.this.running.get()) {
+                        while (agent.running.get()) {
 
                             try {
                                 socket.receive(request);
@@ -338,24 +338,25 @@ public class MulticastPulseAgent implements 
DiscoveryAgent, ServerService, SelfM
                                         if (mpg.equals(req) || 
"*".equals(req)) {
 
                                             //Is there a bad url and is it 
this agent broadcasting the bad URI?
-                                            if (null != badUri && 
getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) {
-                                                final ReentrantLock l = 
MulticastPulseAgent.this.lock;
-                                                l.lock();
-
-                                                try {
-                                                    //Remove it and rebuild 
our broadcast packet
-                                                    if 
(MulticastPulseAgent.this.ignore.add(badUri)) {
-                                                        
MulticastPulseAgent.this.buildPacket();
-
-                                                        
MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI + badUri), 
false);
-
-                                                        log.warning("This 
server has removed the unreachable host '" + badUri + "' from discovery, you 
should consider adding" +
-                                                            " this to the 
'ignore' property in the multipulse.properties file");
+                                            if (null != badUri) {
+                                                if 
(getHosts(agent.ignore).contains(badUri)) {
+                                                    final ReentrantLock l = 
agent.lock;
+                                                    l.lock();
+
+                                                    try {
+                                                        //Remove it and 
rebuild our broadcast packet
+                                                        if 
(agent.ignore.add(badUri)) {
+                                                            
agent.buildPacket();
+                                                            log.warning("This 
server has removed the unreachable host '" + badUri + "' from discovery, you 
should consider adding" +
+                                                                    " this to 
the 'ignore' property in the multipulse.properties file");
+                                                        }
+                                                    } finally {
+                                                        l.unlock();
                                                     }
-
-                                                } finally {
-                                                    l.unlock();
                                                 }
+
+                                                
agent.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false);
+
                                             } else {
 
                                                 //Normal client multicast 
pulse request
@@ -365,8 +366,8 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                                                     //We only have local 
services, so make sure the request is from a local source else ignore it
                                                     if (log.isDebugEnabled()) {
                                                         
log.debug(String.format("Ignoring remote client %1$s pulse request for group: 
%2$s - No remote services available",
-                                                            client,
-                                                            req));
+                                                                client,
+                                                                req));
                                                     }
                                                 } else {
 
@@ -383,7 +384,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                                     }
                                 }
 
-                            } catch (Exception e) {
+                            } catch (final Exception e) {
                                 if (log.isDebugEnabled()) {
                                     log.debug("MulticastPulseAgent request 
error: " + e.getMessage(), e);
                                 }
@@ -392,7 +393,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
 
                         try {
                             socket.close();
-                        } catch (Throwable e) {
+                        } catch (final Throwable e) {
                             //Ignore
                         }
                     }
@@ -402,7 +403,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
             try {
                 //Give threads a reasonable amount of time to start
                 latch.await(5, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
+            } catch (final InterruptedException e) {
                 this.stop();
             }
         }
@@ -417,7 +418,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                 for (final Future future : this.futures) {
                     try {
                         future.cancel(true);
-                    } catch (Throwable e) {
+                    } catch (final Throwable e) {
                         //Ignore
                     }
                 }
@@ -426,7 +427,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                 for (final Future future : this.futures) {
                     try {
                         future.get();
-                    } catch (Throwable e) {
+                    } catch (final Throwable e) {
                         //Ignore
                     }
                 }
@@ -439,7 +440,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                     for (final MulticastSocket s : this.sockets) {
                         try {
                             s.close();
-                        } catch (Throwable e) {
+                        } catch (final Throwable e) {
                             //Ignore
                         }
                     }
@@ -511,7 +512,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
 
         try {
             ia = InetAddress.getByName(multicastAddress);
-        } catch (UnknownHostException e) {
+        } catch (final UnknownHostException e) {
             throw new ServiceException(multicastAddress + " is not a valid 
address", e);
         }
 
@@ -526,7 +527,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
 
         final ArrayList<MulticastSocket> list = new 
ArrayList<MulticastSocket>();
 
-        for (final NetworkInterface ni : getNetworkInterfaces()) {
+        for (final NetworkInterface ni : getInterfaces()) {
 
             MulticastSocket ms = null;
 
@@ -545,12 +546,12 @@ public class MulticastPulseAgent implements 
DiscoveryAgent, ServerService, SelfM
 
                 log.debug(String.format("Created MulticastSocket for 
'%1$s:%2$s' on network adapter: %3$s", ia.getHostName(), port, ni));
 
-            } catch (Throwable e) {
+            } catch (final Throwable e) {
 
                 if (null != ms) {
                     try {
                         ms.close();
-                    } catch (Throwable t) {
+                    } catch (final Throwable t) {
                         //Ignore
                     }
                 }
@@ -573,7 +574,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                     list.add(next);
                 }
             }
-        } catch (SocketException e) {
+        } catch (final SocketException e) {
             //Ignore
         }
 
@@ -591,7 +592,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
         final InetAddress addr;
         try {
             addr = InetAddress.getByName(host);
-        } catch (UnknownHostException e) {
+        } catch (final UnknownHostException e) {
             return false;
         }
 
@@ -610,7 +611,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
         final InetAddress addr;
         try {
             addr = InetAddress.getByName(host);
-        } catch (UnknownHostException e) {
+        } catch (final UnknownHostException e) {
             return false;
         }
 
@@ -622,7 +623,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
         // Check if the address is defined on any interface
         try {
             return NetworkInterface.getByInetAddress(addr) != null;
-        } catch (SocketException e) {
+        } catch (final SocketException e) {
             return false;
         }
     }
@@ -648,7 +649,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                     } else if (0 != h1.compareTo(h2)) {
                         return -1;
                     }
-                } catch (Throwable e) {
+                } catch (final Throwable e) {
                     //Ignore
                 }
 
@@ -660,7 +661,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
             final InetAddress localhost = InetAddress.getLocalHost();
             hosts.add(localhost.getHostAddress());
             //Multi-homed
-            final InetAddress[] all = 
InetAddress.getAllByName(localhost.getCanonicalHostName());
+            final InetAddress[] all = 
InetAddress.getAllByName(localhost.getHostName());
             for (final InetAddress ip : all) {
 
                 if (ip.isLinkLocalAddress() || ip.isMulticastAddress()) {
@@ -670,10 +671,10 @@ public class MulticastPulseAgent implements 
DiscoveryAgent, ServerService, SelfM
                 final String ha = ip.getHostAddress();
                 if (!ha.replace("[", "").startsWith("2001:0:")) { //Filter 
Teredo
                     hosts.add(ha);
-                    hosts.add(ip.getCanonicalHostName());
+                    hosts.add(ip.getHostName());
                 }
             }
-        } catch (UnknownHostException e) {
+        } catch (final UnknownHostException e) {
             log.warning("Failed to list machine hosts", e);
         }
 
@@ -712,7 +713,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
                     try {
                         //Wait indefinitely until we are interrupted or 
notified
                         this.counter.wait();
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                         if (!this.agent.running.get()) {
                             break;
                         }
@@ -724,7 +725,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
 
                     try {
                         this.socket.send(this.agent.getResponsePacket());
-                    } catch (Exception e) {
+                    } catch (final Exception e) {
                         if (log.isDebugEnabled()) {
                             log.debug("MulticastPulseAgent client error: " + 
e.getMessage(), e);
                         }
@@ -732,7 +733,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
 
                     try {
                         Thread.sleep(10);
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                         break;
                     }
                 }
@@ -756,4 +757,4 @@ public class MulticastPulseAgent implements DiscoveryAgent, 
ServerService, SelfM
             return this.socketKey;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tomee/blob/74faacdd/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
----------------------------------------------------------------------
diff --git 
a/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
 
b/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
index a49d5d4..a9d3aa4 100644
--- 
a/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
+++ 
b/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
@@ -17,6 +17,7 @@
 package org.apache.openejb.server.discovery;
 
 import org.apache.openejb.server.DiscoveryListener;
+import org.apache.openejb.util.NetworkUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -56,7 +57,7 @@ public class MulticastPulseAgentTest {
     private static final Charset utf8 = Charset.forName("UTF-8");
     private static final String forGroup = "*";
     private static final String host = "239.255.3.2";
-    private static final int port = 6142;
+    private static final int port = NetworkUtil.getNextAvailablePort();
     private static MulticastPulseAgent agent;
 
     @BeforeClass
@@ -100,7 +101,7 @@ public class MulticastPulseAgentTest {
 
         try {
             ia = InetAddress.getByName(host);
-        } catch (UnknownHostException e) {
+        } catch (final UnknownHostException e) {
             throw new Exception(host + " is not a valid address", e);
         }
 
@@ -159,7 +160,7 @@ public class MulticastPulseAgentTest {
                     } else if (0 != h1.compareTo(h2)) {
                         return -1;
                     }
-                } catch (Throwable e) {
+                } catch (final Throwable e) {
                     //Ignore
                 }
 
@@ -183,7 +184,7 @@ public class MulticastPulseAgentTest {
                     String name = "Unknown interface";
                     try {
                         name = socket.getNetworkInterface().getDisplayName();
-                    } catch (Throwable e) {
+                    } catch (final Throwable e) {
                         //Ignore
                     }
                     System.out.println("Entered MulticastPulse client thread 
on: " + name);
@@ -199,7 +200,7 @@ public class MulticastPulseAgentTest {
 
                             final SocketAddress sa = 
response.getSocketAddress();
 
-                            if (null != sa && (sa instanceof 
InetSocketAddress)) {
+                            if ((sa instanceof InetSocketAddress)) {
 
                                 int len = response.getLength();
                                 if (len > 2048) {
@@ -225,9 +226,9 @@ public class MulticastPulseAgentTest {
                                     final String[] hosts = s.split(",");
 
                                     System.out.println(String.format("\n" + 
name + " received Server pulse:\n\tGroup: %1$s\n\tServices: %2$s\n\tServer: 
%3$s\n",
-                                        group,
-                                        services,
-                                        s));
+                                            group,
+                                            services,
+                                            s));
 
                                     for (final String svc : serviceList) {
 
@@ -238,7 +239,7 @@ public class MulticastPulseAgentTest {
                                         final URI serviceUri;
                                         try {
                                             serviceUri = URI.create(svc);
-                                        } catch (Throwable e) {
+                                        } catch (final Throwable e) {
                                             continue;
                                         }
 
@@ -277,7 +278,7 @@ public class MulticastPulseAgentTest {
                                                     //Just add as is
                                                     
set.add(URI.create(fullsvc));
                                                 }
-                                            } catch (Throwable e) {
+                                            } catch (final Throwable e) {
                                                 //Ignore
                                             } finally {
                                                 setLock.unlock();
@@ -289,7 +290,7 @@ public class MulticastPulseAgentTest {
                                 }
                             }
 
-                        } catch (Throwable e) {
+                        } catch (final Throwable e) {
                             //Ignore
                         }
                     }
@@ -313,7 +314,7 @@ public class MulticastPulseAgentTest {
                 for (final MulticastSocket socket : clientSockets) {
                     try {
                         socket.send(request);
-                    } catch (Throwable e) {
+                    } catch (final Throwable e) {
                         //Ignore
                     }
                 }
@@ -322,7 +323,7 @@ public class MulticastPulseAgentTest {
                 System.out.println("Giving up on threads");
             }
 
-        } catch (InterruptedException e) {
+        } catch (final InterruptedException e) {
             timeout = 1;
         }
 
@@ -336,7 +337,7 @@ public class MulticastPulseAgentTest {
                 for (final Future future : futures) {
                     try {
                         future.cancel(true);
-                    } catch (Throwable e) {
+                    } catch (final Throwable e) {
                         //Ignore
                     }
                 }
@@ -345,12 +346,12 @@ public class MulticastPulseAgentTest {
 
                     try {
                         socket.leaveGroup(ia);
-                    } catch (Throwable e) {
+                    } catch (final Throwable e) {
                         //Ignore
                     }
                     try {
                         socket.close();
-                    } catch (Throwable e) {
+                    } catch (final Throwable e) {
                         //Ignore
                     }
                 }
@@ -361,7 +362,7 @@ public class MulticastPulseAgentTest {
         for (final Future future : futures) {
             try {
                 future.get();
-            } catch (Throwable e) {
+            } catch (final Throwable e) {
                 //Ignore
             }
         }
@@ -416,6 +417,9 @@ public class MulticastPulseAgentTest {
         final String[] hosts = agent.getHosts().split(",");
         final String host = hosts[hosts.length - 1];
 
+        boolean removed = agent.removeFromIgnore(host);
+        org.junit.Assert.assertTrue("Host is already ignored", !removed);
+
         final Future<?> future = executor.submit(new Runnable() {
             @Override
             public void run() {
@@ -427,30 +431,32 @@ public class MulticastPulseAgentTest {
 
                     final MulticastSocket[] multicastSockets = 
MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port);
 
-                    for (final MulticastSocket socket : multicastSockets) {
+                    for (int i = 0; i < 5; i++) {
+                        for (final MulticastSocket socket : multicastSockets) {
 
-                        try {
-                            socket.send(request);
-                        } catch (Exception e) {
-                            System.out.println("Failed to broadcast bad URI 
on: " + socket.getInterface().getHostAddress());
-                            e.printStackTrace();
+                            try {
+                                socket.send(request);
+                                Thread.sleep(100);
+                            } catch (final Exception e) {
+                                System.out.println("Failed to broadcast bad 
URI on: " + socket.getInterface().getHostAddress());
+                                e.printStackTrace();
+                            }
                         }
                     }
-                } catch (Exception e) {
+                } catch (final Exception e) {
                     System.out.println("Failed to broadcast bad URI");
                     e.printStackTrace();
                 }
             }
         });
 
-        final Object o = future.get(10, TimeUnit.SECONDS);
-
         final boolean await = latch.await(20, TimeUnit.SECONDS);
-        final boolean removed = agent.removeFromIgnore(host);
+        removed = agent.removeFromIgnore(host);
 
         agent.setDiscoveryListener(original);
 
-        org.junit.Assert.assertTrue("Failed to remove host", removed && await);
+        org.junit.Assert.assertTrue("Failed to remove host", removed);
+        org.junit.Assert.assertTrue("Failed to unlatch", await);
     }
 
     private String ipFormat(final String h) throws UnknownHostException {
@@ -467,7 +473,7 @@ public class MulticastPulseAgentTest {
         final InetAddress ia;
         try {
             ia = InetAddress.getByName(host);
-        } catch (UnknownHostException e) {
+        } catch (final UnknownHostException e) {
             throw new Exception(host + " is not a valid address", e);
         }
 

Reply via email to