Author: dblevins Date: Mon Nov 1 02:16:47 2010 New Revision: 1029532 URL: http://svn.apache.org/viewvc?rev=1029532&view=rev Log: svn merge -r 1027695:1027696 https://svn.apache.org/repos/asf/openejb/branches/openejb-3.1.x
http://svn.apache.org/viewvc?view=revision&revision=1027696 ------------------------------------------------------------------------ r1027696 | dblevins | 2010-10-26 12:27:19 -0700 (Tue, 26 Oct 2010) | 3 lines OPENEJB-1386: Multipoint discovery issue leading to ignored heartbeat OPENEJB-1387: JMX DiscoverRegistry MBean to monitor services broadcast over multicast and multipoint ------------------------------------------------------------------------ Added: openejb/trunk/openejb3/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java - copied unchanged from r1027696, openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java Modified: openejb/trunk/openejb3/ (props changed) openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java (props changed) openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml (props changed) openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java Propchange: openejb/trunk/openejb3/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Nov 1 02:16:47 2010 @@ -1,3 +1,3 @@ /openejb/branches/openejb-3.1.1:779593 -/openejb/branches/openejb-3.1.x:945409,945448,1004381,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754 +/openejb/branches/openejb-3.1.x:945409,945448,1004381,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754 /openejb/branches/openejb-jcdi:984659-985270 Propchange: openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Nov 1 02:16:47 2010 @@ -1,3 +1,3 @@ /openejb/branches/openejb-3.1.1/container/openejb-core/src/test/java/org/apache/openejb/config/UberInterfaceTest.java:779593 -/openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754 +/openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754 /openejb/branches/openejb-jcdi/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:984659-985270 Propchange: openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Mon Nov 1 02:16:47 2010 @@ -1,3 +1,3 @@ /openejb/branches/openejb-3.1.1/examples/alternate-descriptors/src/main/resources/META-INF/ejb-jar.xml:779593 -/openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754 +/openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754 /openejb/branches/openejb-jcdi/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:984659-985270 Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1029532&r1=1029531&r2=1029532&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original) +++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Mon Nov 1 02:16:47 2010 @@ -28,17 +28,10 @@ import org.apache.openejb.loader.Options import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.DatagramPacket; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; import java.net.Socket; -import java.net.SocketTimeoutException; import java.net.URI; import java.util.Properties; import java.util.StringTokenizer; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -126,6 +119,8 @@ public class MultipointDiscoveryAgent im multipointServer = new MultipointServer(host, port, tracker).start(); + this.port = multipointServer.getPort(); + // Connect the initial set of peer servers StringTokenizer st = new StringTokenizer(initialServers, ","); while (st.hasMoreTokens()) { @@ -134,7 +129,7 @@ public class MultipointDiscoveryAgent im } } catch (Exception e) { - throw new ServiceException(e); + throw new ServiceException(port+"", e); } } Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1029532&r1=1029531&r2=1029532&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original) +++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Mon Nov 1 02:16:47 2010 @@ -27,6 +27,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.URI; import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -50,6 +51,8 @@ import java.util.concurrent.atomic.Atomi public class MultipointServer { private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MultipointServer.class); + private static final URI END_LIST = URI.create("end:list"); + private final String host; private final int port; private final Selector selector; @@ -67,9 +70,7 @@ public class MultipointServer { public MultipointServer(String host, int port, Tracker tracker) throws IOException { if (tracker == null) throw new NullPointerException("tracker cannot be null"); this.host = host; - this.port = port; this.tracker = tracker; - me = URI.create("conn://" + host + ":" + port); ServerSocketChannel serverChannel = ServerSocketChannel.open(); @@ -78,6 +79,9 @@ public class MultipointServer { serverSocket.bind(address); serverChannel.configureBlocking(false); + this.port = serverSocket.getLocalPort(); + me = URI.create("conn://" + this.host + ":" + this.port); + selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); @@ -85,6 +89,10 @@ public class MultipointServer { println("Listening"); } + public int getPort() { + return port; + } + public MultipointServer start() { if (running.compareAndSet(false, true)) { Thread thread = new Thread(new Runnable() { @@ -130,6 +138,7 @@ public class MultipointServer { } public void state(int ops, State state) { +// trace("transition "+state +" "+ops); this.state = state; if (ops > 0) key.interestOps(ops); } @@ -146,6 +155,14 @@ public class MultipointServer { } } + private void info(String str) { +// println(message(str)); + + if (log.isInfoEnabled()) { + log.info(message(str)); + } + } + private String message(String str) { StringBuilder sb = new StringBuilder(); sb.append(port); @@ -242,7 +259,12 @@ public class MultipointServer { } private void heartbeat() throws IOException { - write(tracker.getRegisteredServices()); + + final Set<String> strings = tracker.getRegisteredServices(); + for (String string : strings) { + trace(string); + } + write(strings); state(SelectionKey.OP_READ | SelectionKey.OP_WRITE, State.HEARTBEAT); tracker.checkServices(); @@ -280,7 +302,7 @@ public class MultipointServer { // address of the client before sending data. // once they send us their address, we will send our - // full list of known addresses, followed by our own + // full list of known addresses, followed by the "end" // address to signal that we are done. // Afterward we will only pulls our heartbeat @@ -308,14 +330,18 @@ public class MultipointServer { // before accepting data // once a server reads our address, it will send it's - // full list of known addresses, followed by it's own + // full list of known addresses, followed by the "end" // address to signal that it is done. - // we will initiate connections to everyone in the list - // who we have not yet seen. + // we will then send our full list of known addresses, + // followed by the "end" address to signal we are done. // Afterward the server will only pulls its heartbeat + // separately, we will initiate connections to everyone + // in the list who we have not yet seen. + + // WRITE our GREETING session.write(me); session.state(java.nio.channels.SelectionKey.OP_WRITE, State.GREETING); @@ -329,6 +355,20 @@ public class MultipointServer { switch (session.state) { case GREETING: { // read + // This state is only reachable as a SERVER + // The client connected and said hello by sending + // its URI to let us know who they are + + // Once this is read, the client will expect us + // to send our full list of URIs followed by the + // "end" address. + + // So we switch to WRITE LISTING and they switch + // to READ LISTING + + // Then we will switch to READ LISTING and they + // will switch to WRITE LISTING + String message = session.read(); if (message == null) break; // need to read more @@ -345,8 +385,7 @@ public class MultipointServer { // they'll know it's time to list their URIs list.remove(me); // yank - list.remove(session.uri); // yank - list.add(session.uri); // add to the end + list.add(END_LIST); // add to the end session.write(list); @@ -364,42 +403,58 @@ public class MultipointServer { while ((message = session.read()) != null) { + session.trace(message); + URI uri = URI.create(message); - session.listed.add(uri); + if (END_LIST.equals(uri)) { - session.trace(message); + if (session.client) { - // they listed me, means they want my list - if (uri.equals(me)) { - ArrayList<URI> list = connections(); + ArrayList<URI> list = connections(); - for (URI reported : session.listed) { - list.remove(reported); - } + for (URI reported : session.listed) { + list.remove(reported); + } - // When they read us on the list - // they'll know it's time to switch to heartbeat + // When they read us on the list + // they'll know it's time to switch to heartbeat - list.remove(session.uri); - list.remove(me); // yank if in the middle - list.add(me); // add to the end + list.remove(session.uri); + list.add(END_LIST); + + session.write(list); - session.write(list); + session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING); - session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING); + } else { - } else if (uri.equals(session.uri)) { + // We are a SERVER in this relationship, so we will have already + // listed our known peers by this point. From here we switch to + // heartbeating + + // heartbeat time + if (session.hangup) { + session.state(0, State.CLOSED); + session.trace("hangup"); + hangup(key); + + } else { + + session.trace("DONE READING"); + + session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT); + + } - if (session.hangup) { - session.state(0, State.CLOSED); - session.trace("hangup"); - hangup(key); - } else { - session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT); } + break; + } else { + + session.listed.add(uri); + try { connect(uri); } catch (Exception e) { @@ -431,6 +486,10 @@ public class MultipointServer { switch (session.state) { case GREETING: { // write + // Only CLIENTs write a GREETING message + // As we are a client, the first thing we do + // is READ the server's LIST + if (session.drain()) { session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING); } @@ -442,16 +501,20 @@ public class MultipointServer { if (session.drain()) { - // we haven't ready any URIs yet - if (session.listed.size() == 0) { - - session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING); + if (session.client) { + // CLIENTs list last, so at this point we've read + // the server's list and have written ours + + session.trace("DONE WRITING"); + session.state(SelectionKey.OP_READ, State.HEARTBEAT); + } else { + // SERVERs always write their list first, so at this + // point we switch to LIST READ mode - session.trace("DONE"); + session.state(SelectionKey.OP_READ, State.LISTING); - session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT); } } } @@ -474,6 +537,13 @@ public class MultipointServer { } } + } catch (CancelledKeyException ex) { + synchronized (connect) { + Session session = (Session) key.attachment(); + if (session.state != State.CLOSED) { + close(key); + } + } } catch (ClosedChannelException ex) { synchronized (connect) { Session session = (Session) key.attachment(); @@ -493,7 +563,7 @@ public class MultipointServer { Session session = (Session) key.attachment(); try { - if (session != null) session.tick(); + if (session != null && session.state == State.HEARTBEAT) session.tick(); } catch (IOException ex) { close(key); } @@ -570,7 +640,6 @@ public class MultipointServer { key.channel().close(); } catch (IOException cex) { } - } @@ -644,8 +713,8 @@ public class MultipointServer { session = sessions[0]; duplicate = sessions[1]; - session.trace(session + "@" + session.hashCode() + " KEEP"); - duplicate.trace(duplicate + "@" + duplicate.hashCode() + " KILL"); + session.info(session + "@" + session.hashCode() + " KEEP"); + duplicate.info(duplicate + "@" + duplicate.hashCode() + " KILL"); duplicate.hangup = true; } Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=1029532&r1=1029531&r2=1029532&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java (original) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java Mon Nov 1 02:16:47 2010 @@ -17,6 +17,7 @@ package org.apache.openejb.server; import org.apache.openejb.loader.SystemInstance; +import org.apache.openejb.monitoring.Managed; import java.net.URI; import java.util.List; @@ -44,6 +45,9 @@ public class DiscoveryRegistry implement private final Map<String, URI> services = new ConcurrentHashMap<String, URI>(); private final Map<String, URI> registered = new ConcurrentHashMap<String, URI>(); + @Managed + private final Monitor monitor = new Monitor(); + private final Executor executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { public Thread newThread(Runnable runable) { Thread t = new Thread(runable, DiscoveryRegistry.class.getSimpleName()); @@ -125,7 +129,7 @@ public class DiscoveryRegistry implement } public void serviceRemoved(URI service) { - + services.remove(service.toString()); for (final DiscoveryListener discoveryListener : getListeners()) { executor.execute(new ServiceRemovedTask(discoveryListener, service)); } @@ -168,4 +172,30 @@ public class DiscoveryRegistry implement } } } + + @Managed + private class Monitor { + + @Managed + public String[] getDiscovered() { + final Set<String> set = DiscoveryRegistry.this.services.keySet(); + return set.toArray(new String[set.size()]); + } + + @Managed + public String[] getRegistered() { + final Set<String> set = DiscoveryRegistry.this.registered.keySet(); + return set.toArray(new String[set.size()]); + } + + @Managed + public String[] getAgents() { + List<String> list = new ArrayList<String>(); + for (DiscoveryAgent agent : DiscoveryRegistry.this.agents) { + list.add(agent.getClass().getName()); + } + return list.toArray(new String[list.size()]); + } + + } } Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1029532&r1=1029531&r2=1029532&view=diff ============================================================================== --- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java (original) +++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java Mon Nov 1 02:16:47 2010 @@ -17,6 +17,7 @@ package org.apache.openejb.server; import java.io.IOException; +import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.util.Iterator; import java.util.List; @@ -25,8 +26,13 @@ import java.util.Properties; import org.apache.openejb.loader.SystemInstance; +import org.apache.openejb.monitoring.ManagedMBean; +import org.apache.openejb.monitoring.ObjectNameBuilder; import org.apache.xbean.finder.ResourceFinder; +import javax.management.MBeanServer; +import javax.management.ObjectName; + /** * @version $Rev$ $Date$ * @org.apache.xbean.XBean element="simpleServiceManager" @@ -107,6 +113,22 @@ public class SimpleServiceManager extend } DiscoveryRegistry registry = new DiscoveryRegistry(); + + // register the mbean + try { + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + + ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb"); + jmxName.set("type", "Server"); + jmxName.set("name", "DiscoveryRegistry"); + + ObjectName objectName = jmxName.build(); + server.registerMBean(new ManagedMBean(registry), objectName); + } catch (Exception e) { + logger.error("Unable to register MBean ", e); + } + + SystemInstance.get().setComponent(DiscoveryRegistry.class, registry); ServiceFinder serviceFinder = new ServiceFinder("META-INF/");
