Hi Andy, Did you miss a commit? The build is broken. It looks like the versions are missing.
[]s, Thiago. On Tue, Dec 10, 2013 at 11:54 AM, <andygumbre...@apache.org> wrote: > Author: andygumbrecht > Date: Tue Dec 10 16:54:47 2013 > New Revision: 1549891 > > URL: http://svn.apache.org/r1549891 > Log: > Improve MulticastPulseAgent/MulticastPulseClient by allowing client to > notify server of unreachable hosts - Eventually a server will stop sending > them out. > Pull some more <version> tags up to parent pom dep-management where they > should all be. > Allow server project to build from it's own directory. > Finals. > > Modified: > > tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java > tomee/tomee/trunk/server/openejb-common-cli/pom.xml > > tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java > > tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java > > tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java > > tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java > > tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java > tomee/tomee/trunk/server/openejb-server/pom.xml > > tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java > > tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java > > Modified: > tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- > tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java > (original) > +++ > tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java > Tue Dec 10 16:54:47 2013 > @@ -64,6 +64,7 @@ public class MulticastPulseClient extend > private static final Logger log = Logger.getLogger("OpenEJB.client"); > private static final String SERVER = "OpenEJB.MCP.Server:"; > private static final String CLIENT = "OpenEJB.MCP.Client:"; > + private static final String BADURI = ":BadUri:"; > private static final String EMPTY = "NoService"; > private static final Charset UTF8 = Charset.forName("UTF-8"); > private static final int TTL = > Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, > "32")); > @@ -118,12 +119,7 @@ public class MulticastPulseClient extend > > if (null == uriSet || uriSet.isEmpty()) { > > - final Map<String, String> params; > - try { > - params = URIs.parseParamters(uri); > - } catch (URISyntaxException e) { > - throw new IllegalArgumentException("Invalid MultiPulse > uri " + uri.toString(), e); > - } > + final Map<String, String> params = getUriParameters(uri); > > final Set<String> schemes = getSet(params, "schemes", > this.getDefaultSchemes()); > final String group = getString(params, "group", "default"); > @@ -140,17 +136,39 @@ public class MulticastPulseClient extend > > for (final URI serviceURI : uriSet) { > > + //Strip serverhost and group and try to connect > + final URI tryUri = > URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()); > + > try { > - //Strip serverhost and group and try to connect > - return > ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart())); > + return ConnectionManager.getConnection(tryUri); > } catch (Exception e) { > + > uriSet.remove(serviceURI); > + > + if (java.net.SocketTimeoutException.class.isInstance(e) > || SocketException.class.isInstance(e)) { > + //Notify server that this URI is not reachable > + > MulticastPulseClient.broadcastBadUri(getString(getUriParameters(uri), > "group", "default"), tryUri, uri.getHost(), uri.getPort()); > + } > + > + if (log.isLoggable(Level.FINE)) { > + log.fine("Failed connection to: " + serviceURI); > + } > } > } > > throw new IOException("Unable to connect an ejb server via the > MultiPulse URI: " + uri); > } > > + private static Map<String, String> getUriParameters(final URI uri) { > + final Map<String, String> params; > + try { > + params = URIs.parseParamters(uri); > + } catch (URISyntaxException e) { > + throw new IllegalArgumentException("Invalid MultiPulse uri " > + uri.toString(), e); > + } > + return params; > + } > + > /** > * Get a list of URIs discovered for the provided request. > * <p/> > @@ -187,17 +205,7 @@ public class MulticastPulseClient extend > throw new Exception("Specify a valid port between 1 and > 65535"); > } > > - final InetAddress ia; > - > - try { > - ia = InetAddress.getByName(host); > - } catch (UnknownHostException e) { > - throw new Exception(host + " is not a valid address", e); > - } > - > - if (null == ia || !ia.isMulticastAddress()) { > - throw new Exception(host + " is not a valid multicast > address"); > - } > + final InetAddress ia = getAddress(host); > > final byte[] bytes = (MulticastPulseClient.CLIENT + > forGroup).getBytes(UTF8); > final DatagramPacket request = new DatagramPacket(bytes, > bytes.length, new InetSocketAddress(ia, port)); > @@ -492,6 +500,20 @@ public class MulticastPulseClient extend > } > } > > + private static InetAddress getAddress(final String host) throws > Exception { > + final InetAddress ia; > + try { > + ia = InetAddress.getByName(host); > + } catch (UnknownHostException e) { > + throw new Exception(host + " is not a valid address", e); > + } > + > + if (null == ia || !ia.isMulticastAddress()) { > + throw new Exception(host + " is not a valid multicast > address"); > + } > + return ia; > + } > + > /** > * Is the provided host a local host > * > @@ -681,7 +703,10 @@ public class MulticastPulseClient extend > s.connect(new InetSocketAddress(host, > port), st); > b = true; > } catch (Exception e) { > - //Ignore > + if > (java.net.SocketTimeoutException.class.isInstance(e) || > SocketException.class.isInstance(e)) { > + > MulticastPulseClient.broadcastBadUri(group, uriSub, mchost, mcport); > + System.out.print("" + e + " : "); > + } > } finally { > try { > s.close(); > @@ -716,4 +741,38 @@ public class MulticastPulseClient extend > running.set(false); > t.interrupt(); > } > + > + /** > + * Asynchronous attempt to broadcast a bad URI on our channel. > + * Hopefully the culprit server will hear this and stop sending it. > + * > + * @param uri Bad URI to broadcast > + */ > + private static void broadcastBadUri(final String group, final URI > uri, final String host, final int port) { > + > + getExecutorService().submit(new Runnable() { > + @Override > + public void run() { > + try { > + final InetAddress ia = getAddress(host); > + > + final byte[] bytes = (MulticastPulseClient.CLIENT + > group + MulticastPulseClient.BADURI + uri.getHost()).getBytes(UTF8); > + final DatagramPacket request = new > DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port)); > + > + final MulticastSocket[] multicastSockets = > MulticastPulseClient.getSockets(ia, port); > + > + for (final MulticastSocket socket : multicastSockets) > { > + > + try { > + socket.send(request); > + } catch (Exception e) { > + log.log(Level.WARNING, "Failed to broadcast > bad URI: " + uri + " on: " + socket.getInterface().getHostAddress(), e); > + } > + } > + } catch (Exception e) { > + log.log(Level.WARNING, "Failed to broadcast bad URI: > " + uri, e); > + } > + } > + }); > + } > } > > Modified: tomee/tomee/trunk/server/openejb-common-cli/pom.xml > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- tomee/tomee/trunk/server/openejb-common-cli/pom.xml (original) > +++ tomee/tomee/trunk/server/openejb-common-cli/pom.xml Tue Dec 10 > 16:54:47 2013 > @@ -36,20 +36,38 @@ > <version>${project.version}</version> > </dependency> > <dependency> > + <groupId>org.apache.openejb</groupId> > + <artifactId>openejb-loader</artifactId> > + <version>${project.version}</version> > + </dependency> > + <dependency> > <groupId>jline</groupId> > <artifactId>jline</artifactId> > - <version>0.9.94</version> > </dependency> > <dependency> > <groupId>com.google.code.gson</groupId> > <artifactId>gson</artifactId> > - <version>2.1</version> > </dependency> > <dependency> > + <groupId>org.apache.xbean</groupId> > + <artifactId>xbean-reflect</artifactId> > + </dependency> > + <dependency> > + <groupId>org.apache.xbean</groupId> > + <artifactId>xbean-finder-shaded</artifactId> > + </dependency> > + <dependency> > + <groupId>org.apache.commons</groupId> > + <artifactId>commons-lang3</artifactId> > + </dependency> > + > + <!-- Test scope --> > + <dependency> > <groupId>org.codehaus.groovy</groupId> > <artifactId>groovy-all</artifactId> > - <version>2.1.0</version> > - <scope>test</scope> <!-- don't deliver it --> > + <scope>test</scope> > </dependency> > + > + > </dependencies> > </project> > > Modified: > tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- > tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java > (original) > +++ > tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/CliRunnable.java > Tue Dec 10 16:54:47 2013 > @@ -42,6 +42,7 @@ import java.util.Properties; > import java.util.TreeMap; > > public class CliRunnable implements Runnable { > + > private static final Logger LOGGER = > Logger.getInstance(LogCategory.OPENEJB_SERVER, CliRunnable.class); > > private static final String BRANDING_FILE = "branding.properties"; > @@ -87,8 +88,11 @@ public class CliRunnable implements Runn > UrlSet urlSet = new UrlSet(loader).excludeJvm(); > urlSet = urlSet.exclude(loader.getParent()); > > - final IAnnotationFinder finder = new AnnotationFinder(new > ConfigurableClasspathArchive(new > ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP), > true, urlSet.getUrls())); > - for (Annotated<Class<?>> cmd : > finder.findMetaAnnotatedClasses(Command.class)) { > + //noinspection unchecked > + final IAnnotationFinder finder = new AnnotationFinder(new > ConfigurableClasspathArchive(new > ConfigurableClasspathArchive.FakeModule(loader, Collections.EMPTY_MAP), > + > true, > + > urlSet.getUrls())); > + for (final Annotated<Class<?>> cmd : > finder.findMetaAnnotatedClasses(Command.class)) { > try { > final Command annotation = > cmd.getAnnotation(Command.class); > final String key = annotation.name(); > @@ -113,15 +117,15 @@ public class CliRunnable implements Runn > private OutputStream err; > private OutputStream out; > private InputStream sin; > - private String username; > - private String bind; > - private int port; > + private final String username; > + private final String bind; > + private final int port; > > - public CliRunnable(String bind, int port) { > + public CliRunnable(final String bind, final int port) { > this(bind, port, PROMPT, null); > } > > - public CliRunnable(String bind, int port, String username, String > sep) { > + public CliRunnable(final String bind, final int port, final String > username, final String sep) { > this.bind = bind; > this.port = port; > this.username = username; > @@ -144,15 +148,15 @@ public class CliRunnable implements Runn > } > } > > - public void setInputStream(InputStream in) { > + public void setInputStream(final InputStream in) { > sin = in; > } > > - public void setOutputStream(OutputStream out) { > + public void setOutputStream(final OutputStream out) { > this.out = out; > } > > - public void setErrorStream(OutputStream err) { > + public void setErrorStream(final OutputStream err) { > this.err = err; > } > > @@ -165,9 +169,10 @@ public class CliRunnable implements Runn > } > > public void clean() { > - scripter.clearEngines(); > + OpenEJBScripter.clearEngines(); > } > > + @Override > public void run() { > clean(); > > @@ -180,13 +185,13 @@ public class CliRunnable implements Runn > // TODO : add completers > > String line; > - StringBuilder builtWelcome = new StringBuilder("Apache > OpenEJB ") > - .append(OpenEjbVersion.get().getVersion()) > - .append(" build: ") > - .append(OpenEjbVersion.get().getDate()) > - .append("-") > - .append(OpenEjbVersion.get().getTime()) > - .append(lineSep); > + final StringBuilder builtWelcome = new StringBuilder("Apache > OpenEJB ") > + > .append(OpenEjbVersion.get().getVersion()) > + .append(" build: ") > + > .append(OpenEjbVersion.get().getDate()) > + .append("-") > + > .append(OpenEjbVersion.get().getTime()) > + .append(lineSep); > if (tomee) { > > > builtWelcome.append(OS_LINE_SEP).append(PROPERTIES.getProperty(WELCOME_TOMEE_KEY)); > } else { > @@ -196,10 +201,10 @@ public class CliRunnable implements Runn > > streamManager.writeOut(OpenEjbVersion.get().getUrl()); > streamManager.writeOut(builtWelcome.toString() > - .replace("$bind", bind) > - .replace("$port", Integer.toString(port)) > - .replace("$name", NAME) > - .replace(OS_LINE_SEP, lineSep)); > + .replace("$bind", bind) > + .replace("$port", > Integer.toString(port)) > + .replace("$name", NAME) > + .replace(OS_LINE_SEP, > lineSep)); > > while ((line = reader.readLine(prompt())) != null) { > // exit simply let us go out of the loop > @@ -210,7 +215,7 @@ public class CliRunnable implements Runn > > Class<?> cmdClass = null; > String key = null; > - for (Map.Entry<String, Class<?>> cmd : > COMMANDS.entrySet()) { > + for (final Map.Entry<String, Class<?>> cmd : > COMMANDS.entrySet()) { > if (line.startsWith(cmd.getKey())) { > cmdClass = cmd.getValue(); > key = cmd.getKey(); > @@ -270,8 +275,8 @@ public class CliRunnable implements Runn > prompt.append(PROMPT); > } > prompt.append(" @ ") > - .append(bind).append(":").append(port) > - .append(PROMPT_SUFFIX); > + .append(bind).append(":").append(port) > + .append(PROMPT_SUFFIX); > return prompt.toString(); > } > } > > Modified: > tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- > tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java > (original) > +++ > tomee/tomee/trunk/server/openejb-common-cli/src/main/java/org/apache/openejb/server/cli/StreamManager.java > Tue Dec 10 16:54:47 2013 > @@ -26,6 +26,7 @@ import java.io.OutputStreamWriter; > import java.util.Collection; > > public class StreamManager { > + > private static final String OS_LINE_SEP = > System.getProperty("line.separator"); > > private String lineSep; > @@ -34,16 +35,16 @@ public class StreamManager { > private OutputStream out; > private OutputStream err; > > - public StreamManager(OutputStream out, OutputStream err, String > lineSep) { > + public StreamManager(final OutputStream out, final OutputStream err, > final String lineSep) { > this.lineSep = lineSep; > this.out = out; > this.err = err; > this.sout = new OutputStreamWriter(out); > - this.serr= new OutputStreamWriter(err); > + this.serr = new OutputStreamWriter(err); > } > > private void write(final OutputStreamWriter writer, final String s) { > - for (String l : s.split(lineSep)) { > + for (final String l : s.split(lineSep)) { > try { > writer.write(l); > writer.write(lineSep); > @@ -64,7 +65,7 @@ public class StreamManager { > } else { > final StringBuilder error = new StringBuilder(); > error.append(e.getMessage()).append(lineSep); > - for (StackTraceElement elt : e.getStackTrace()) { > + for (final StackTraceElement elt : e.getStackTrace()) { > error.append(" > ").append(elt.toString()).append(lineSep); > } > write(serr, error.toString()); > @@ -77,7 +78,7 @@ public class StreamManager { > } > if (out instanceof Collection) { > final StringBuilder builder = new StringBuilder(); > - for (Object o : (Collection) out) { > + for (final Object o : (Collection) out) { > builder.append(string(o, lineSep)).append(lineSep); > } > return builder.toString(); > @@ -89,10 +90,10 @@ public class StreamManager { > if (!out.getClass().getName().startsWith("java")) { > try { > return new > GsonBuilder().setPrettyPrinting().create().toJson(out) > - .replace(OS_LINE_SEP, lineSep); > + .replace(OS_LINE_SEP, lineSep); > } catch (RuntimeException re) { > return ToStringBuilder.reflectionToString(out, > ToStringStyle.SHORT_PREFIX_STYLE) > - .replace(OS_LINE_SEP, lineSep); > + .replace(OS_LINE_SEP, lineSep); > } > } > return out.toString(); > @@ -123,7 +124,7 @@ public class StreamManager { > } > > public void writeOut(final String text, final String sep) { > - for (String line : text.split(sep)) { > + for (final String line : text.split(sep)) { > writeOut(line); > } > } > > Modified: > tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- > tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java > (original) > +++ > tomee/tomee/trunk/server/openejb-http/src/main/java/org/apache/openejb/server/httpd/BeginWebBeansListener.java > Tue Dec 10 16:54:47 2013 > @@ -61,7 +61,7 @@ public class BeginWebBeansListener imple > * > * @param webBeansContext the OWB context > */ > - public BeginWebBeansListener(WebBeansContext webBeansContext) { > + public BeginWebBeansListener(final WebBeansContext webBeansContext) { > this.webBeansContext = webBeansContext; > this.failoverService = > this.webBeansContext.getService(FailOverService.class); > this.contextKey = "org.apache.tomee.catalina.WebBeansListener@" > + webBeansContext.hashCode(); > @@ -71,7 +71,7 @@ public class BeginWebBeansListener imple > * {@inheritDoc} > */ > @Override > - public void requestDestroyed(ServletRequestEvent event) { > + public void requestDestroyed(final ServletRequestEvent event) { > // no-op > } > > @@ -127,7 +127,7 @@ public class BeginWebBeansListener imple > * {@inheritDoc} > */ > @Override > - public void sessionDestroyed(HttpSessionEvent event) { > + public void sessionDestroyed(final HttpSessionEvent event) { > ensureRequestScope(); > } > > @@ -139,21 +139,20 @@ public class BeginWebBeansListener imple > } > } > > - > @Override > - public void sessionWillPassivate(HttpSessionEvent event) { > + public void sessionWillPassivate(final HttpSessionEvent event) { > ensureRequestScope(); > } > > @Override > - public void sessionDidActivate(HttpSessionEvent event) { > + public void sessionDidActivate(final HttpSessionEvent event) { > if (failoverService.isSupportFailOver() || > failoverService.isSupportPassivation()) { > failoverService.sessionDidActivate(event.getSession()); > } > } > > @Override > - public void contextInitialized(ServletContextEvent > servletContextEvent) { > + public void contextInitialized(final ServletContextEvent > servletContextEvent) { > try { > > > OpenEJBLifecycle.initializeServletContext(servletContextEvent.getServletContext(), > webBeansContext); > } catch (final Exception e) { > @@ -163,7 +162,7 @@ public class BeginWebBeansListener imple > } > > @Override > - public void contextDestroyed(ServletContextEvent servletContextEvent) > { > + public void contextDestroyed(final ServletContextEvent > servletContextEvent) { > ensureRequestScope(); > } > } > > Modified: > tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- > tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java > (original) > +++ > tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java > Tue Dec 10 16:54:47 2013 > @@ -26,6 +26,7 @@ import java.net.URI; > import java.net.UnknownHostException; > import java.nio.charset.Charset; > import java.util.ArrayList; > +import java.util.Collections; > import java.util.Comparator; > import java.util.Enumeration; > import java.util.HashSet; > @@ -39,6 +40,7 @@ import java.util.concurrent.Future; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicInteger; > +import java.util.concurrent.locks.ReentrantLock; > > /** > * Licensed to the Apache Software Foundation (ASF) under one or more > @@ -66,9 +68,11 @@ public class MulticastPulseAgent impleme > > public static final String SERVER = "OpenEJB.MCP.Server:"; > public static final String CLIENT = "OpenEJB.MCP.Client:"; > + public static final String BADURI = ":BadUri:"; > public static final String EMPTY = "NoService"; > > - private final Set<String> ignore = new HashSet<String>(); > + private final ReentrantLock lock = new ReentrantLock(); > + private final Set<String> ignore = Collections.synchronizedSet(new > HashSet<String>()); > private final Set<URI> uriSet = new HashSet<URI>(); > private AtomicBoolean running = new AtomicBoolean(false); > final ArrayList<Future> futures = new ArrayList<Future>(); > @@ -84,12 +88,19 @@ public class MulticastPulseAgent impleme > private boolean loopbackOnly = true; > > /** > + * @author Andy Gumbrecht > * This agent listens for client pulses on a defined multicast > channel. > * On receipt of a valid pulse the agent responds with its own pulse > for > * a defined amount of time and rate. A client can deliver a pulse as > often as > * required until it is happy of the server response. > * <p/> > * Both server and client deliver crafted information payloads. > + * <p/> > + * The client pulse contains OpenEJB.MCP.Client:(group or > *)[:BadUri:URI] > + * The server will only respond to a request for it's own group or * > + * The optional :BadUri: is used by clients to notify a server that > it is sending out unreachable URI's > + * <p/> > + * The server response pulse contains > OpenEJB.MCP.Server:(Service|Service)|(Comma separated host list) > */ > public MulticastPulseAgent() { > } > @@ -111,7 +122,7 @@ public class MulticastPulseAgent impleme > length = 1; > } > > - executor = Executors.newFixedThreadPool(length * 2); > + executor = Executors.newFixedThreadPool(length * 3); > } > > return executor; > @@ -134,7 +145,7 @@ public class MulticastPulseAgent impleme > } > } > } catch (Exception e) { > - log.warning("Invalid ignore parameter. Should be a lowercase > single or comma seperated list like: ignore=host1,host2"); > + log.warning("Invalid ignore parameter. Should be a lowercase > single host or comma seperated list of hosts to ignore like: > ignore=host1,host2,ipv4,ipv6"); > } > > this.multicast = o.get("bind", this.multicast); > @@ -148,41 +159,59 @@ public class MulticastPulseAgent impleme > > private void buildPacket() throws SocketException { > > - this.loopbackOnly = true; > - for (final URI uri : this.uriSet) { > - if (!isLoopback(uri.getHost())) { > - this.loopbackOnly = false; > - break; > + final ReentrantLock l = this.lock; > + l.lock(); > + > + try { > + this.loopbackOnly = true; > + for (final URI uri : this.uriSet) { > + if (!isLoopback(uri.getHost())) { > + this.loopbackOnly = false; > + break; > + } > } > - } > > - final String hosts = getHosts(this.ignore); > - final StringBuilder sb = new StringBuilder(SERVER); > - sb.append(this.group); > - sb.append(':'); > + final String hosts = getHosts(this.ignore); > + final StringBuilder sb = new StringBuilder(SERVER); > + sb.append(this.group); > + sb.append(':'); > > - if (this.uriSet.size() > 0) { > - for (final URI uri : this.uriSet) { > - sb.append(uri.toASCIIString()); > + if (this.uriSet.size() > 0) { > + for (final URI uri : this.uriSet) { > + sb.append(uri.toASCIIString()); > + sb.append('|'); > + } > + } else { > + sb.append(EMPTY); > sb.append('|'); > } > - } else { > - sb.append(EMPTY); > - sb.append('|'); > - } > > - sb.append(hosts); > + sb.append(hosts); > + > + final byte[] bytes = (sb.toString()).getBytes(UTF8); > + this.response = new DatagramPacket(bytes, bytes.length, > this.address); > > - final byte[] bytes = (sb.toString()).getBytes(UTF8); > - this.response = new DatagramPacket(bytes, bytes.length, > this.address); > + if (log.isDebugEnabled()) { > + log.debug("MultiPulse packet is: " + sb); > + } > > - if (log.isDebugEnabled()) { > - log.debug("MultiPulse packet is: " + sb); > + if (bytes.length > 2048) { > + log.warning("MultiPulse packet is larger than 2048 bytes, > clients will not be able to read the packet" + > + "\n - You should define the 'ignore' property > to filter out unreachable addresses: " + sb); > + } > + } finally { > + l.unlock(); > } > + } > > - if (bytes.length > 2048) { > - log.warning("MultiPulse packet is larger than 2048 bytes, > clients will not be able to read the packet" + > - "\n - You should define the 'ignore' property to > filter out unreachable addresses: " + sb); > + public DatagramPacket getResponsePacket() { > + final ReentrantLock l = this.lock; > + l.lock(); > + > + try { > + return this.response; > + } finally { > + l.unlock(); > } > } > > @@ -191,6 +220,10 @@ public class MulticastPulseAgent impleme > this.listener = listener; > } > > + public DiscoveryListener getDiscoveryListener() { > + return listener; > + } > + > @Override > public void registerService(URI uri) throws IOException { > > @@ -267,7 +300,7 @@ public class MulticastPulseAgent impleme > continue; > } > > - final Sender sender = new Sender(this, socketKey, socket, > this.response); > + final Sender sender = new Sender(this, socketKey, socket); > this.futures.add(executorService.submit(sender)); > > this.futures.add(executorService.submit(new Runnable() { > @@ -289,34 +322,64 @@ public class MulticastPulseAgent impleme > > if (req.startsWith(CLIENT)) { > > - req = (req.replace(CLIENT, "")); > + final int ix = > req.indexOf(BADURI); > + String badUri = null; > + > + if (ix > 0) { > + //The client is notifying of > a bad uri > + badUri = > req.substring(ix).replace(BADURI, ""); > + req = req.substring(0, > ix).replace(CLIENT, ""); > + } else { > + req = (req.replace(CLIENT, > "")); > + } > > + //Is this a group or global pulse > request > if (mpg.equals(req) || > "*".equals(req)) { > > - final String client = > ((InetSocketAddress) sa).getAddress().getHostAddress(); > + //Is there a bad url and is > it this agent broadcasting the bad URI? > + if (null != badUri && > getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) { > + final ReentrantLock l = > MulticastPulseAgent.this.lock; > + l.lock(); > + > + try { > + //Remove it and > rebuild our broadcast packet > + if > (MulticastPulseAgent.this.ignore.add(badUri)) { > + > MulticastPulseAgent.this.buildPacket(); > + > + > MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI + > badUri), false); > + > + log.warning("This > server has removed the unreachable host '" + badUri + "' from discovery, > you should consider adding" + > + " > this to the 'ignore' property in the multipulse.properties file"); > + } > + > + } finally { > + l.unlock(); > + } > + } else { > > - if (isLoopBackOnly) { > - //We only have local > services, so make sure the request is from a local source else ignore it > - if > (!MulticastPulseAgent.isLocalAddress(client, false)) { > + //Normal client multicast > pulse request > + final String client = > ((InetSocketAddress) sa).getAddress().getHostAddress(); > + > + if (isLoopBackOnly && > !MulticastPulseAgent.isLocalAddress(client, false)) { > + //We only have local > services, so make sure the request is from a local source else ignore it > if > (log.isDebugEnabled()) { > > log.debug(String.format("Ignoring remote client %1$s pulse request for > group: %2$s - No remote services available", > > client, > > req)); > } > - return; > - } > - } > + } else { > > - //We have received a valid > pulse request > - if (log.isDebugEnabled()) { > - > log.debug(String.format("Answering client '%1$s' pulse request for group: > '%2$s' on '%3$s'", client, req, socketKey)); > - } > + //We have received a > valid pulse request > + if > (log.isDebugEnabled()) { > + > log.debug(String.format("Answering client '%1$s' pulse request for group: > '%2$s' on '%3$s'", client, req, socketKey)); > + } > > - //Renew response pulse > - sender.pulseResponse(); > + //Renew response pulse > + > sender.pulseResponse(); > + } > + } > } > } > - > } > > } catch (Exception e) { > @@ -412,6 +475,27 @@ public class MulticastPulseAgent impleme > } > > /** > + * Lists current broadcast hosts as a comma separated list. > + * Used principally for testing. > + * > + * @return String > + */ > + public String getHosts() { > + return getHosts(this.ignore); > + } > + > + /** > + * Remove a host from the ignore list. > + * Used principally for testing. > + * > + * @param host String > + * @return True if removed, else false > + */ > + public boolean removeFromIgnore(final String host) { > + return this.ignore.remove(host); > + } > + > + /** > * Attempts to return at least one socket per valid network interface. > * If no valid interface is found then the array will be empty. > * > @@ -612,13 +696,11 @@ public class MulticastPulseAgent impleme > private final MulticastPulseAgent agent; > private final String socketKey; > private final MulticastSocket socket; > - private final DatagramPacket mpr; > > - private Sender(final MulticastPulseAgent agent, final String > socketKey, final MulticastSocket socket, final DatagramPacket mpr) { > + private Sender(final MulticastPulseAgent agent, final String > socketKey, final MulticastSocket socket) { > this.agent = agent; > this.socketKey = socketKey; > this.socket = socket; > - this.mpr = mpr; > } > > @Override > @@ -640,7 +722,7 @@ public class MulticastPulseAgent impleme > while (this.counter.decrementAndGet() > 0) { > > try { > - this.socket.send(this.mpr); > + this.socket.send(this.agent.getResponsePacket()); > } catch (Exception e) { > if (log.isDebugEnabled()) { > log.debug("MulticastPulseAgent client error: > " + e.getMessage(), e); > > Modified: > tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- > tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java > (original) > +++ > tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java > Tue Dec 10 16:54:47 2013 > @@ -48,11 +48,6 @@ import java.util.concurrent.TimeUnit; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.locks.ReentrantLock; > > -/** > - * Copyright (c) ORPRO Vision GmbH. > - * Author: Andy Gumbrecht > - * Date: 11.06.12 > - */ > @SuppressWarnings("UseOfSystemOutOrSystemErr") > public class MulticastPulseAgentTest { > > @@ -229,7 +224,10 @@ public class MulticastPulseAgentTest { > final String[] serviceList = > services.split("\\|"); > final String[] hosts = s.split(","); > > - System.out.println(String.format("\n" > + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: > %2$s\n\tServer: %3$s\n", group, services, s)); > + System.out.println(String.format("\n" > + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: > %2$s\n\tServer: %3$s\n", > + > group, > + > services, > + s)); > > for (final String svc : serviceList) { > > @@ -392,6 +390,69 @@ public class MulticastPulseAgentTest { > org.junit.Assert.assertTrue(timeout == 1 || set.size() > 0); > } > > + @Test > + public void testBroadcastBadUri() throws Exception { > + > + final DiscoveryListener original = agent.getDiscoveryListener(); > + > + final CountDownLatch latch = new CountDownLatch(1); > + > + final DiscoveryListener listener = new DiscoveryListener() { > + @Override > + public void serviceAdded(final URI service) { > + latch.countDown(); > + System.out.println("added = " + service); > + } > + > + @Override > + public void serviceRemoved(final URI service) { > + latch.countDown(); > + System.out.println("removed = " + service); > + } > + }; > + > + agent.setDiscoveryListener(listener); > + > + final String[] hosts = agent.getHosts().split(","); > + final String host = hosts[hosts.length - 1]; > + > + final Future<?> future = executor.submit(new Runnable() { > + @Override > + public void run() { > + try { > + final InetAddress ia = > getAddress(MulticastPulseAgentTest.host); > + > + final byte[] bytes = (MulticastPulseAgent.CLIENT + > forGroup + MulticastPulseAgent.BADURI + > host).getBytes(Charset.forName("UTF-8")); > + final DatagramPacket request = new > DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port)); > + > + final MulticastSocket[] multicastSockets = > MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port); > + > + for (final MulticastSocket socket : multicastSockets) > { > + > + try { > + socket.send(request); > + } catch (Exception e) { > + System.out.println("Failed to broadcast bad > URI on: " + socket.getInterface().getHostAddress()); > + e.printStackTrace(); > + } > + } > + } catch (Exception e) { > + System.out.println("Failed to broadcast bad URI"); > + e.printStackTrace(); > + } > + } > + }); > + > + final Object o = future.get(10, TimeUnit.SECONDS); > + > + final boolean await = latch.await(20, TimeUnit.SECONDS); > + final boolean removed = agent.removeFromIgnore(host); > + > + agent.setDiscoveryListener(original); > + > + org.junit.Assert.assertTrue("Failed to remove host", removed && > await); > + } > + > private String ipFormat(final String h) throws UnknownHostException { > > final InetAddress ia = InetAddress.getByName(h); > @@ -402,6 +463,20 @@ public class MulticastPulseAgentTest { > } > } > > + private static InetAddress getAddress(final String host) throws > Exception { > + final InetAddress ia; > + try { > + ia = InetAddress.getByName(host); > + } catch (UnknownHostException e) { > + throw new Exception(host + " is not a valid address", e); > + } > + > + if (null == ia || !ia.isMulticastAddress()) { > + throw new Exception(host + " is not a valid multicast > address"); > + } > + return ia; > + } > + > private static class MyDiscoveryListener implements DiscoveryListener > { > > private final String id; > > Modified: tomee/tomee/trunk/server/openejb-server/pom.xml > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/pom.xml?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- tomee/tomee/trunk/server/openejb-server/pom.xml (original) > +++ tomee/tomee/trunk/server/openejb-server/pom.xml Tue Dec 10 16:54:47 > 2013 > @@ -65,6 +65,11 @@ > </dependency> > <dependency> > <groupId>org.apache.openejb</groupId> > + <artifactId>openejb-jee</artifactId> > + <version>${project.version}</version> > + </dependency> > + <dependency> > + <groupId>org.apache.openejb</groupId> > <artifactId>openejb-loader</artifactId> > <version>${project.version}</version> > </dependency> > @@ -84,6 +89,10 @@ > </dependency> > <dependency> > <groupId>org.apache.xbean</groupId> > + <artifactId>xbean-finder-shaded</artifactId> > + </dependency> > + <dependency> > + <groupId>org.apache.xbean</groupId> > <artifactId>xbean-reflect</artifactId> > </dependency> > <dependency> > > Modified: > tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- > tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java > (original) > +++ > tomee/tomee/trunk/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java > Tue Dec 10 16:54:47 2013 > @@ -155,7 +155,7 @@ public class SimpleServiceManager extend > @Override > public synchronized void start(final boolean block) throws > ServiceException { > > - if(stopped){ > + if (stopped) { > throw new ServiceException("Stop has already been called on > ServiceManager"); > } > > > Modified: > tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java > URL: > http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java?rev=1549891&r1=1549890&r2=1549891&view=diff > > ============================================================================== > --- > tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java > (original) > +++ > tomee/tomee/trunk/server/openejb-server/src/test/java/org/apache/openejb/server/FilteredServiceManagerWithAdminTest.java > Tue Dec 10 16:54:47 2013 > @@ -40,12 +40,12 @@ public class FilteredServiceManagerWithA > } > > @Test > - public void numberOfServices () { > + public void numberOfServices() { > // when using @EnableServices with the application composer > // the return value should be a FilteredServiceManager > assertEquals(FilteredServiceManager.class, > ServiceManager.get().getClass()); > > - FilteredServiceManager manager = (FilteredServiceManager) > ServiceManager.get(); > + final FilteredServiceManager manager = (FilteredServiceManager) > ServiceManager.get(); > assertEquals(1, manager.getDaemons().length); > assertEquals("admin", manager.getDaemons()[0].getName()); > } > > >