Author: fhanik Date: Tue Feb 28 15:31:26 2006 New Revision: 381827 URL: http://svn.apache.org/viewcvs?rev=381827&view=rev Log: Cleaned up synchronization for the frag interceptor, only need to synchronize on write to the map, not read.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java?rev=381827&r1=381826&r2=381827&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/FragmentationInterceptor.java Tue Feb 28 15:31:26 2006 @@ -24,6 +24,9 @@ import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.ChannelInterceptorBase; import org.apache.catalina.tribes.io.XByteBuffer; +import java.util.Collection; +import java.util.ArrayList; +import java.util.Set; /** * @@ -39,6 +42,8 @@ * @version 1.0 */ public class FragmentationInterceptor extends ChannelInterceptorBase { + private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog( FragmentationInterceptor.class ); + protected HashMap fragpieces = new HashMap(); private int maxSize = 1024*100; private long expire = 1000 * 60; //one minute expiration @@ -68,16 +73,21 @@ } - public synchronized FragCollection getFragCollection(FragKey key, ChannelMessage msg) { + public FragCollection getFragCollection(FragKey key, ChannelMessage msg) { FragCollection coll = (FragCollection)fragpieces.get(key); if ( coll == null ) { - coll = new FragCollection(msg); - fragpieces.put(key,coll); + synchronized (fragpieces) { + coll = (FragCollection)fragpieces.get(key); + if ( coll == null ) { + coll = new FragCollection(msg); + fragpieces.put(key, coll); + } + } } return coll; } - public synchronized void removeFragCollection(FragKey key) { + public void removeFragCollection(FragKey key) { fragpieces.remove(key); } @@ -120,6 +130,23 @@ for ( int i=0; i<messages.length; i++ ) { super.sendMessage(destination,messages[i],payload); } + } + + public void heartbeat() { + try { + Set set = fragpieces.keySet(); + Object[] keys = set.toArray(); + for ( int i=0; i<keys.length; i++ ) { + FragKey key = (FragKey)keys[i]; + if ( key != null && key.expired(getExpire()) ) + removeFragCollection(key); + } + }catch ( Exception x ) { + if ( log.isErrorEnabled() ) { + log.error("Unable to perform heartbeat clean up in the frag interceptor",x); + } + } + super.heartbeat(); } Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=381827&r1=381826&r2=381827&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Tue Feb 28 15:31:26 2006 @@ -1,24 +1,25 @@ To Do: +=========================================== + +Documentation: +=========================================== -Tasks: -4. ChannelMessage.getMessage should return streamable, that way we can wrap, -pass it around and all those good things without having to copy byte arrays -left and right -5. NIO and IO DataSender, since the IO is blocking -Interceptors to create -1. OrderInterceptor - guarantees the order of messages -2. WaitForCompletionInterceptor - waits for the message to get processed by all receivers before returning +Code Tasks: +=========================================== + +6. NIO and IO DataSender, since the IO is blocking + +8. WaitForCompletionInterceptor - waits for the message to get processed by all receivers before returning (This is useful when synchronized=false and waitForAck=false, to improve parallel processing, but you want to have all messages sent in parallel and don't return until all have been processed on the remote end.) -3. FragmentationInterceptor - splits up messages that are larger than X bytes. -4. CoordinatorInterceptor - manages the selection of a cluster coordinator -5. VirtualSynchronyInterceptor - not sure we want to build this one, it would be -pretty slow, but it would guarantee that all messages were received, to the -members in that group in that time. + +9. CoordinatorInterceptor - manages the selection of a cluster coordinator +10. Xa2PhaseCommitInterceptor - make sure the message doesn't reach the receiver unless all members got it Tasks Completed +=========================================== 1. True synchronized/asynchronized replication enabled using flags Sender.sendAck/Receiver.waitForAck/Receiver.synchronized Task Desc: waitForAck - should only mean, we received the message, not for the @@ -27,9 +28,22 @@ Status: Complete Notes: - 2. Unique id, send it in byte array instead of string 3. DataSender or ReplicationTransmitter swallows IOException, this should be Notes: This has only been fixed for the pooled synchronized. the fastasynch aint working that well + +4. ChannelMessage.getMessage should return streamable, that way we can wrap, +pass it around and all those good things without having to copy byte arrays +left and right +Notes: Instead of using a streamable, this is implemented using the XByteBuffer, + which is very easy to use. It also becomes a single spot for optimizations. + Ideally, there would be a pool of XByteBuffers, that all use direct ByteBuffers + for its data handling. + +5. OrderInterceptor - guarantees the order of messages +Notes: completed + +7. FragmentationInterceptor - splits up messages that are larger than X bytes. +Notes: complated --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]