Author: asmuts Date: Tue Jul 21 19:00:11 2009 New Revision: 796476 URL: http://svn.apache.org/viewvc?rev=796476&view=rev Log: Moving UDP discovery out of the tcp lateral package. I'm making something that can be used by other auxiliaries. I'm not finished yet.
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java (original) +++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java Tue Jul 21 19:00:11 2009 @@ -168,19 +168,23 @@ // create the UDP discovery for the TCP lateral if ( lac.isUdpDiscoveryEnabled() ) { - LateralTCPDiscoveryListener discoveryListener = new LateralTCPDiscoveryListener( cacheMgr, cacheEventLogger, - elementSerializer ); + // TODO this will create one for each region, but one can be used for all regions + LateralTCPDiscoveryListener discoveryListener = new LateralTCPDiscoveryListener( cacheMgr, + cacheEventLogger, + elementSerializer ); discoveryListener.addNoWaitFacade( lcnwf, lac.getCacheName() ); - + // need a factory for this so it doesn't // get dereferenced, also we don't want one for every region. - discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(), lac.getUdpDiscoveryPort(), lac.getTcpListenerPort(), cacheMgr, cacheEventLogger, - elementSerializer ); + discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(), + lac.getUdpDiscoveryPort(), + lac.getTcpListenerPort(), cacheMgr, + cacheEventLogger ); discovery.addParticipatingCacheName( lac.getCacheName() ); - discovery.setDiscoveryListener( discoveryListener ); - + discovery.addDiscoveryListener( discoveryListener ); + if ( log.isInfoEnabled() ) { log.info( "Created UDPDiscoveryService for TCP lateral cache." ); Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java (original) +++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java Tue Jul 21 19:00:11 2009 @@ -18,7 +18,11 @@ import org.apache.jcs.utils.discovery.DiscoveredService; import org.apache.jcs.utils.discovery.behavior.IDiscoveryListener; -/** This knows how to add and remove discovered services. */ +/** + * This knows how to add and remove discovered services. + * <p> + * We can have one listener per region, or one shared by all regions. + */ public class LateralTCPDiscoveryListener implements IDiscoveryListener { @@ -80,6 +84,9 @@ * message, the add no wait will be called here. To add a no wait, the facade is looked up for * this cache name. * <p> + * Each region has a facade. The facade contains a list of end points--the other tcp lateral + * services. + * <p> * @param noWait */ protected void addNoWait( LateralCacheNoWait noWait ) @@ -150,11 +157,7 @@ // get a cache and add it to the no waits // the add method should not add the same. // we need the listener port from the original config. - ITCPLateralCacheAttributes lca = new TCPLateralCacheAttributes(); - lca.setTransmissionType( LateralCacheAttributes.TCP ); - lca.setTcpServer( service.getServiceAddress() + ":" + service.getServicePort() ); - LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr, cacheEventLogger, - elementSerializer ); + LateralTCPCacheManager lcm = findManagerForServiceEndPoint( service ); ArrayList regions = service.getCacheNames(); if ( regions != null ) @@ -180,7 +183,7 @@ addNoWait( (LateralCacheNoWait) ic ); if ( log.isDebugEnabled() ) { - log.debug( "Called addNoWait for cacheName " + cacheName ); + log.debug( "Called addNoWait for cacheName [" + cacheName + "]" ); } } } @@ -189,7 +192,6 @@ log.error( "Problem creating no wait", e ); } } - // end while } else { @@ -199,10 +201,69 @@ /** * Removes the lateral cache. + * <p> * @param service */ public void removeDiscoveredService( DiscoveredService service ) { - // TODO Auto-generated method stub + // get a cache and add it to the no waits + // the add method should not add the same. + // we need the listener port from the original config. + LateralTCPCacheManager lcm = findManagerForServiceEndPoint( service ); + + ArrayList regions = service.getCacheNames(); + if ( regions != null ) + { + // for each region get the cache + Iterator it = regions.iterator(); + while ( it.hasNext() ) + { + String cacheName = (String) it.next(); + + try + { + ICache ic = lcm.getCache( cacheName ); + + if ( log.isDebugEnabled() ) + { + log.debug( "Got cache, ic = " + ic ); + } + + // remove this to the nowaits for this cachename + if ( ic != null ) + { + removeNoWait( (LateralCacheNoWait) ic ); + if ( log.isDebugEnabled() ) + { + log.debug( "Called removeNoWait for cacheName [" + cacheName + "]" ); + } + } + } + catch ( Exception e ) + { + log.error( "Problem removing no wait", e ); + } + } + } + else + { + log.warn( "No cache names found in message " + service ); + } + } + + /** + * Gets the appropriate manager. + * <p> + * @param service + * @return LateralTCPCacheManager configured for that end point. + */ + private LateralTCPCacheManager findManagerForServiceEndPoint( DiscoveredService service ) + { + ITCPLateralCacheAttributes lca = new TCPLateralCacheAttributes(); + lca.setTransmissionType( LateralCacheAttributes.TCP ); + lca.setTcpServer( service.getServiceAddress() + ":" + service.getServicePort() ); + LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr, cacheEventLogger, + elementSerializer ); + return lcm; } } Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java (original) +++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java Tue Jul 21 19:00:11 2009 @@ -25,7 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.jcs.engine.behavior.ICompositeCacheManager; -import org.apache.jcs.engine.behavior.IElementSerializer; import org.apache.jcs.engine.behavior.IShutdownObservable; import org.apache.jcs.engine.logging.behavior.ICacheEventLogger; @@ -73,13 +72,11 @@ * @param servicePort * @param cacheMgr * @param cacheEventLogger - * @param elementSerializer * @return UDPDiscoveryService */ public synchronized UDPDiscoveryService getService( String discoveryAddress, int discoveryPort, int servicePort, ICompositeCacheManager cacheMgr, - ICacheEventLogger cacheEventLogger, - IElementSerializer elementSerializer ) + ICacheEventLogger cacheEventLogger ) { String key = discoveryAddress + ":" + discoveryPort + ":" + servicePort; @@ -97,6 +94,8 @@ attributes.setServicePort( servicePort ); service = new UDPDiscoveryService( attributes, cacheEventLogger ); + + // register for shutdown notification ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( service ); services.put( key, service ); Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java (original) +++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java Tue Jul 21 19:00:11 2009 @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.jcs.auxiliary.lateral.LateralCacheInfo; import org.apache.jcs.engine.behavior.IShutdownObserver; import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; @@ -92,7 +91,7 @@ // create a small thread pool to handle a barrage pooledExecutor = new PooledExecutor( new BoundedBuffer( 100 ), maxPoolSize ); pooledExecutor.discardOldestWhenBlocked(); - pooledExecutor.setMinimumPoolSize(1); + //pooledExecutor.setMinimumPoolSize(1); pooledExecutor.setThreadFactory( new MyThreadFactory() ); if ( log.isInfoEnabled() ) @@ -125,11 +124,15 @@ try { mSocket = new MulticastSocket( multicastPort ); + if ( log.isInfoEnabled() ) + { + log.info( "Joining Group: [" + InetAddress.getByName( multicastAddressString ) + "]" ); + } mSocket.joinGroup( InetAddress.getByName( multicastAddressString ) ); } catch ( IOException e ) { - log.error( "Could not bind to multicast address [" + multicastAddressString + ":" + multicastPort + "]", e ); + log.error( "Could not bind to multicast address [" + InetAddress.getByName( multicastAddressString ) + ":" + multicastPort + "]", e ); throw e; } } @@ -149,6 +152,11 @@ Object obj = null; try { + if ( log.isDebugEnabled() ) + { + log.debug( "Waiting for message." ); + } + mSocket.receive( packet ); if ( log.isDebugEnabled() ) @@ -157,9 +165,7 @@ } final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() ); - final ObjectInputStream objectStream = new ObjectInputStream( byteStream ); - obj = objectStream.readObject(); if ( log.isDebugEnabled() ) @@ -275,7 +281,7 @@ public void run() { // consider comparing ports here instead. - if ( message.getRequesterId() == LateralCacheInfo.listenerId ) + if ( message.getRequesterId() == UDPDiscoveryInfo.listenerId ) { if ( log.isDebugEnabled() ) { @@ -370,6 +376,7 @@ try { shutdown = true; + mSocket.leaveGroup( InetAddress.getByName( multicastAddressString ) ); mSocket.close(); pooledExecutor.shutdownNow(); } Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java (original) +++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java Tue Jul 21 19:00:11 2009 @@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.ObjectOutputStream; import java.net.DatagramPacket; import java.net.InetAddress; import java.net.MulticastSocket; @@ -29,7 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.jcs.auxiliary.lateral.LateralCacheInfo; +import org.apache.jcs.utils.serialization.StandardSerializer; /** * This is a generic sender for the UDPDiscovery process. @@ -50,6 +49,9 @@ /** The port */ private int multicastPort; + /** Used to serialize messages */ + private StandardSerializer serializer = new StandardSerializer(); + /** * Constructor for the UDPDiscoverySender object * <p> @@ -69,8 +71,8 @@ if ( log.isInfoEnabled() ) { log.info( "Constructing socket for sender." ); - } - localSocket = new MulticastSocket(); + } + localSocket = new MulticastSocket( port ); // Remote address. multicastAddress = InetAddress.getByName( host ); @@ -137,25 +139,22 @@ if ( log.isDebugEnabled() ) { log.debug( "sending UDPDiscoveryMessage, address [" + multicastAddress + "], port [" + multicastPort - + "], message = " + message ); } + + "], message = " + message ); + } try { - // write the object to a byte array. - final MyByteArrayOutputStream byteStream = new MyByteArrayOutputStream(); - final ObjectOutputStream objectStream = new ObjectOutputStream( byteStream ); - objectStream.writeObject( message ); - objectStream.flush(); - final byte[] bytes = byteStream.getBytes(); + final byte[] bytes = serializer.serialize( message ); // put the byte array in a packet final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, multicastAddress, multicastPort ); if ( log.isDebugEnabled() ) { - log.debug( "Sending DatagramPacket. bytes.length [" + bytes.length + "] to " + multicastAddress + ":" + multicastPort ); + log.debug( "Sending DatagramPacket. bytes.length [" + bytes.length + "] to " + multicastAddress + ":" + + multicastPort ); } - + localSocket.send( packet ); } catch ( IOException e ) @@ -186,7 +185,7 @@ } /** - * This sends a message broadcasting our that the host and port is available for connections. + * This sends a message broadcasting out that the host and port is available for connections. * <p> * It uses the vmid as the requesterDI * @param host @@ -197,7 +196,7 @@ public void passiveBroadcast( String host, int port, ArrayList cacheNames ) throws IOException { - passiveBroadcast( host, port, cacheNames, LateralCacheInfo.listenerId ); + passiveBroadcast( host, port, cacheNames, UDPDiscoveryInfo.listenerId ); } /** @@ -239,7 +238,7 @@ public void removeBroadcast( String host, int port, ArrayList cacheNames ) throws IOException { - removeBroadcast( host, port, cacheNames, LateralCacheInfo.listenerId ); + removeBroadcast( host, port, cacheNames, UDPDiscoveryInfo.listenerId ); } /** Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java (original) +++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java Tue Jul 21 19:00:11 2009 @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.Set; @@ -77,8 +78,8 @@ /** This a list of regions that are configured to use discovery. */ private Set cacheNames = new CopyOnWriteArraySet(); - /** handles add and remove. consider making into a set. */ - private IDiscoveryListener discoveryListener; + /** Set of listeners. */ + private Set discoveryListeners = new CopyOnWriteArraySet(); /** * @param attributes @@ -208,7 +209,12 @@ public void removeDiscoveredService( DiscoveredService service ) { getDiscoveredServices().remove( service ); - getDiscoveryListener().removeDiscoveredService( service ); + + Iterator it = getDiscoveryListeners().iterator(); + while ( it.hasNext() ) + { + ( (IDiscoveryListener) it.next() ).removeDiscoveredService( service ); + } } /** @@ -234,7 +240,11 @@ getDiscoveredServices().add( discoveredService ); // todo update some list of cachenames - getDiscoveryListener().addDiscoveredService( discoveredService ); + Iterator it = getDiscoveryListeners().iterator(); + while ( it.hasNext() ) + { + ( (IDiscoveryListener) it.next() ).addDiscoveredService( discoveredService ); + } } else { @@ -404,18 +414,42 @@ } /** - * @param discoveryListener the discoveryListener to set + * @return the discoveryListeners */ - public void setDiscoveryListener( IDiscoveryListener discoveryListener ) + private Set getDiscoveryListeners() { - this.discoveryListener = discoveryListener; + return discoveryListeners; } /** - * @return the discoveryListener + * @return the discoveryListeners + */ + public Set getCopyOfDiscoveryListeners() + { + Set copy = new HashSet(); + copy.addAll( getDiscoveryListeners() ); + return copy; + } + + /** + * Adds a listener. + * <p> + * @param listener + * @return true if it wasn't already in the set + */ + public boolean addDiscoveryListener( IDiscoveryListener listener ) + { + return getDiscoveryListeners().add( listener ); + } + + /** + * Removes a listener. + * <p> + * @param listener + * @return true if it was in the set */ - public IDiscoveryListener getDiscoveryListener() + public boolean removeDiscoveryListener( IDiscoveryListener listener ) { - return discoveryListener; + return getDiscoveryListeners().remove( listener ); } } Modified: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java (original) +++ jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java Tue Jul 21 19:00:11 2009 @@ -33,7 +33,7 @@ private static final String ADDRESS = "228.4.5.9"; /** multicast address to send/receive on */ - private static final int PORT = 5555; + private static final int PORT = 5556; /** imaginary host address for sending */ private static final String SENDING_HOST = "imaginary host address"; Modified: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java (original) +++ jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java Tue Jul 21 19:00:11 2009 @@ -46,7 +46,7 @@ service.addParticipatingCacheName( "testCache1" ); MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); - service.setDiscoveryListener( discoveryListener ); + service.addDiscoveryListener( discoveryListener ); DiscoveredService discoveredService = new DiscoveredService(); discoveredService.setServiceAddress( host ); @@ -80,7 +80,7 @@ service.addParticipatingCacheName( "testCache1" ); MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); - service.setDiscoveryListener( discoveryListener ); + service.addDiscoveryListener( discoveryListener ); DiscoveredService discoveredService = new DiscoveredService(); discoveredService.setServiceAddress( host ); @@ -135,7 +135,7 @@ service.addParticipatingCacheName( "testCache1" ); MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); - service.setDiscoveryListener( discoveryListener ); + service.addDiscoveryListener( discoveryListener ); DiscoveredService discoveredService = new DiscoveredService(); discoveredService.setServiceAddress( host ); Modified: jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java?rev=796476&r1=796475&r2=796476&view=diff ============================================================================== --- jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java (original) +++ jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java Tue Jul 21 19:00:11 2009 @@ -49,7 +49,7 @@ service.addParticipatingCacheName( "testCache1" ); MockDiscoveryListener discoveryListener = new MockDiscoveryListener(); - service.setDiscoveryListener( discoveryListener ); + service.addDiscoveryListener( discoveryListener ); // create a receiver with the service UDPDiscoveryReceiver receiver = new UDPDiscoveryReceiver( service, attributes.getUdpDiscoveryAddr(), attributes --------------------------------------------------------------------- To unsubscribe, e-mail: jcs-dev-unsubscr...@jakarta.apache.org For additional commands, e-mail: jcs-dev-h...@jakarta.apache.org