Author: fhanik
Date: Thu Apr 13 15:02:56 2006
New Revision: 393958
URL: http://svn.apache.org/viewcvs?rev=393958&view=rev
Log:
Fixed Map state transfer, removed not needed curPos variable from the sender
since we are reusing a write buffer
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=393958&r1=393957&r2=393958&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
Thu Apr 13 15:02:56 2006
@@ -132,16 +132,21 @@
this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
this.channel.addChannelListener(this);
this.channel.addMembershipListener(this);
-
- //transfer state from another map
- transferState();
+
+
try {
+ broadcast(MapMessage.MSG_INIT, true);
+ //transfer state from another map
+ transferState();
broadcast(MapMessage.MSG_START, true);
} catch (ChannelException x) {
log.warn("Unable to send map start message.");
+ throw new RuntimeException("Unable to start replicated map.",x);
}
}
+
+
private void broadcast(int msgtype, boolean rpc) throws ChannelException {
//send out a map membership message, only wait for the first reply
@@ -190,6 +195,17 @@
return result;
}
}
+
+ public Member[] getMapMembersExcl(Member[] exclude) {
+ synchronized (mapMembers) {
+ ArrayList list = (ArrayList)mapMembers.clone();
+ for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
+ Member[] result = new Member[list.size()];
+ list.toArray(result);
+ return result;
+ }
+ }
+
/**
* Replicates any changes to the object since the last time
@@ -276,6 +292,8 @@
messageReceived( (Serializable) list.get(i),
resp[0].getSource());
} //for
}
+ } else {
+ log.warn("Transfer state, 0 replies, probably a timeout.");
}
}
} catch (ChannelException x) {
@@ -297,6 +315,12 @@
if (! (msg instanceof MapMessage))return null;
MapMessage mapmsg = (MapMessage) msg;
+ //map init request
+ if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
+ mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+ return mapmsg;
+ }
+
//map start request
if (mapmsg.getMsgType() == mapmsg.MSG_START) {
mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
@@ -607,10 +631,6 @@
public Object setValue(Object value) {
Object old = this.value;
this.value = (Serializable) value;
- if ( value==null ) {
- Exception x = new Exception(this.toString());
- x.printStackTrace();
- }
return old;
}
@@ -682,6 +702,7 @@
public static final int MSG_STATE = 5;
public static final int MSG_START = 6;
public static final int MSG_STOP = 7;
+ public static final int MSG_INIT = 8;
private byte[] mapId;
private int msgtype;
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=393958&r1=393957&r2=393958&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Thu Apr 13 15:02:56 2006
@@ -179,7 +179,7 @@
} else if ( entry.isProxy() ) {
//invalidate the previous primary
msg = new
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
- getChannel().send(backup,msg,getChannelSendOptions());
+
getChannel().send(getMapMembersExcl(backup),msg,getChannelSendOptions());
}
entry.setBackupNodes(backup);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=393958&r1=393957&r2=393958&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
Thu Apr 13 15:02:56 2006
@@ -59,7 +59,6 @@
protected ByteBuffer readbuf = null;
protected ByteBuffer writebuf = null;
protected byte[] current = null;
- protected int curPos=0;
protected XByteBuffer ackbuf = new XByteBuffer(128,true);
protected int remaining = 0;
protected boolean complete;
@@ -174,16 +173,14 @@
//weve written everything, or we are starting a new package
//protect against buffer overwrite
int byteswritten = socketChannel.write(writebuf);
- curPos += byteswritten;
remaining -= byteswritten;
//if the entire message was written from the buffer
//reset the position counter
- if ( curPos >= current.length ) {
- curPos = 0;
+ if ( remaining < 0 ) {
remaining = 0;
}
}
- return (remaining==0 && curPos == 0);
+ return (remaining==0);
}
//no message to send, we can consider that complete
return true;
@@ -255,7 +252,6 @@
if ( readbuf != null ) readbuf.clear();
if ( writebuf != null ) writebuf.clear();
current = null;
- curPos = 0;
ackbuf.clear();
remaining = 0;
complete = false;
@@ -291,7 +287,6 @@
if ( data != null ) {
current = data;
remaining = length;
- curPos = offset;
ackbuf.clear();
if ( writebuf != null ) writebuf.clear();
else writebuf = getBuffer(length);
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=393958&r1=393957&r2=393958&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Apr 13 15:02:56 2006
@@ -30,7 +30,7 @@
Bugs:
===========================================
a) Somehow the first NIO connection made, always closes down, why
- b) State synchronization for the map
+ b) State synchronization for the map - will need to add in MSG_INIT
Code Tasks:
===========================================
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]