Author: fhanik Date: Thu Apr 13 15:02:56 2006 New Revision: 393958 URL: http://svn.apache.org/viewcvs?rev=393958&view=rev Log: Fixed Map state transfer, removed not needed curPos variable from the sender since we are reusing a write buffer
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=393958&r1=393957&r2=393958&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Thu Apr 13 15:02:56 2006 @@ -132,16 +132,21 @@ this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); this.channel.addChannelListener(this); this.channel.addMembershipListener(this); - - //transfer state from another map - transferState(); + + try { + broadcast(MapMessage.MSG_INIT, true); + //transfer state from another map + transferState(); broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn("Unable to send map start message."); + throw new RuntimeException("Unable to start replicated map.",x); } } + + private void broadcast(int msgtype, boolean rpc) throws ChannelException { //send out a map membership message, only wait for the first reply @@ -190,6 +195,17 @@ return result; } } + + public Member[] getMapMembersExcl(Member[] exclude) { + synchronized (mapMembers) { + ArrayList list = (ArrayList)mapMembers.clone(); + for (int i=0; i<exclude.length;i++) list.remove(exclude[i]); + Member[] result = new Member[list.size()]; + list.toArray(result); + return result; + } + } + /** * Replicates any changes to the object since the last time @@ -276,6 +292,8 @@ messageReceived( (Serializable) list.get(i), resp[0].getSource()); } //for } + } else { + log.warn("Transfer state, 0 replies, probably a timeout."); } } } catch (ChannelException x) { @@ -297,6 +315,12 @@ if (! (msg instanceof MapMessage))return null; MapMessage mapmsg = (MapMessage) msg; + //map init request + if (mapmsg.getMsgType() == mapmsg.MSG_INIT) { + mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false))); + return mapmsg; + } + //map start request if (mapmsg.getMsgType() == mapmsg.MSG_START) { mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false))); @@ -607,10 +631,6 @@ public Object setValue(Object value) { Object old = this.value; this.value = (Serializable) value; - if ( value==null ) { - Exception x = new Exception(this.toString()); - x.printStackTrace(); - } return old; } @@ -682,6 +702,7 @@ public static final int MSG_STATE = 5; public static final int MSG_START = 6; public static final int MSG_STOP = 7; + public static final int MSG_INIT = 8; private byte[] mapId; private int msgtype; Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=393958&r1=393957&r2=393958&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Thu Apr 13 15:02:56 2006 @@ -179,7 +179,7 @@ } else if ( entry.isProxy() ) { //invalidate the previous primary msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup); - getChannel().send(backup,msg,getChannelSendOptions()); + getChannel().send(getMapMembersExcl(backup),msg,getChannelSendOptions()); } entry.setBackupNodes(backup); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=393958&r1=393957&r2=393958&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java Thu Apr 13 15:02:56 2006 @@ -59,7 +59,6 @@ protected ByteBuffer readbuf = null; protected ByteBuffer writebuf = null; protected byte[] current = null; - protected int curPos=0; protected XByteBuffer ackbuf = new XByteBuffer(128,true); protected int remaining = 0; protected boolean complete; @@ -174,16 +173,14 @@ //weve written everything, or we are starting a new package //protect against buffer overwrite int byteswritten = socketChannel.write(writebuf); - curPos += byteswritten; remaining -= byteswritten; //if the entire message was written from the buffer //reset the position counter - if ( curPos >= current.length ) { - curPos = 0; + if ( remaining < 0 ) { remaining = 0; } } - return (remaining==0 && curPos == 0); + return (remaining==0); } //no message to send, we can consider that complete return true; @@ -255,7 +252,6 @@ if ( readbuf != null ) readbuf.clear(); if ( writebuf != null ) writebuf.clear(); current = null; - curPos = 0; ackbuf.clear(); remaining = 0; complete = false; @@ -291,7 +287,6 @@ if ( data != null ) { current = data; remaining = length; - curPos = offset; ackbuf.clear(); if ( writebuf != null ) writebuf.clear(); else writebuf = getBuffer(length); Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=393958&r1=393957&r2=393958&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original) +++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Apr 13 15:02:56 2006 @@ -30,7 +30,7 @@ Bugs: =========================================== a) Somehow the first NIO connection made, always closes down, why - b) State synchronization for the map + b) State synchronization for the map - will need to add in MSG_INIT Code Tasks: =========================================== --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]