Author: fhanik
Date: Thu Apr 13 15:02:56 2006
New Revision: 393958

URL: http://svn.apache.org/viewcvs?rev=393958&view=rev
Log:
Fixed Map state transfer, removed not needed curPos variable from the sender 
since we are reusing a write buffer 

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=393958&r1=393957&r2=393958&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 Thu Apr 13 15:02:56 2006
@@ -132,16 +132,21 @@
         this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
         this.channel.addChannelListener(this);
         this.channel.addMembershipListener(this);
-
-        //transfer state from another map
-        transferState();
+        
+        
         try {
+            broadcast(MapMessage.MSG_INIT, true);
+            //transfer state from another map
+            transferState();
             broadcast(MapMessage.MSG_START, true);
         } catch (ChannelException x) {
             log.warn("Unable to send map start message.");
+            throw new RuntimeException("Unable to start replicated map.",x);
         }
 
     }
+    
+    
 
     private void broadcast(int msgtype, boolean rpc) throws ChannelException {
         //send out a map membership message, only wait for the first reply
@@ -190,6 +195,17 @@
             return result;
         }
     }
+    
+    public Member[] getMapMembersExcl(Member[] exclude) {
+        synchronized (mapMembers) {
+            ArrayList list = (ArrayList)mapMembers.clone();
+            for (int i=0; i<exclude.length;i++) list.remove(exclude[i]);
+            Member[] result = new Member[list.size()];
+            list.toArray(result);
+            return result;
+        }
+    }
+
 
     /**
      * Replicates any changes to the object since the last time
@@ -276,6 +292,8 @@
                             messageReceived( (Serializable) list.get(i), 
resp[0].getSource());
                         } //for
                     }
+                } else {
+                    log.warn("Transfer state, 0 replies, probably a timeout.");
                 }
             }
         } catch (ChannelException x) {
@@ -297,6 +315,12 @@
         if (! (msg instanceof MapMessage))return null;
         MapMessage mapmsg = (MapMessage) msg;
 
+        //map init request
+        if (mapmsg.getMsgType() == mapmsg.MSG_INIT) {
+            mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
+            return mapmsg;
+        }
+        
         //map start request
         if (mapmsg.getMsgType() == mapmsg.MSG_START) {
             mapmsg.setBackUpNodes(wrap(channel.getLocalMember(false)));
@@ -607,10 +631,6 @@
         public Object setValue(Object value) {
             Object old = this.value;
             this.value = (Serializable) value;
-            if ( value==null ) {
-                Exception x = new Exception(this.toString());
-                x.printStackTrace();
-            }
             return old;
         }
 
@@ -682,6 +702,7 @@
         public static final int MSG_STATE = 5;
         public static final int MSG_START = 6;
         public static final int MSG_STOP = 7;
+        public static final int MSG_INIT = 8;
 
         private byte[] mapId;
         private int msgtype;

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=393958&r1=393957&r2=393958&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Thu Apr 13 15:02:56 2006
@@ -179,7 +179,7 @@
                 } else if ( entry.isProxy() ) {
                     //invalidate the previous primary
                     msg = new 
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,backup);
-                    getChannel().send(backup,msg,getChannelSendOptions());
+                    
getChannel().send(getMapMembersExcl(backup),msg,getChannelSendOptions());
                 }
 
                 entry.setBackupNodes(backup);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java?rev=393958&r1=393957&r2=393958&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioSender.java
 Thu Apr 13 15:02:56 2006
@@ -59,7 +59,6 @@
     protected ByteBuffer readbuf = null;
     protected ByteBuffer writebuf = null;
     protected byte[] current = null;
-    protected int curPos=0;
     protected XByteBuffer ackbuf = new XByteBuffer(128,true);
     protected int remaining = 0;
     protected boolean complete;
@@ -174,16 +173,14 @@
                 //weve written everything, or we are starting a new package
                 //protect against buffer overwrite
                 int byteswritten = socketChannel.write(writebuf);
-                curPos += byteswritten;
                 remaining -= byteswritten;
                 //if the entire message was written from the buffer
                 //reset the position counter
-                if ( curPos >= current.length ) {
-                    curPos = 0;
+                if ( remaining < 0 ) {
                     remaining = 0;
                 }
             }
-            return (remaining==0 && curPos == 0);
+            return (remaining==0);
         }
         //no message to send, we can consider that complete
         return true;
@@ -255,7 +252,6 @@
         if ( readbuf != null ) readbuf.clear();
         if ( writebuf != null ) writebuf.clear();
         current = null;
-        curPos = 0;
         ackbuf.clear();
         remaining = 0;
         complete = false;
@@ -291,7 +287,6 @@
        if ( data != null ) {
            current = data;
            remaining = length;
-           curPos = offset;
            ackbuf.clear();
            if ( writebuf != null ) writebuf.clear();
            else writebuf = getBuffer(length);

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=393958&r1=393957&r2=393958&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Thu Apr 13 15:02:56 2006
@@ -30,7 +30,7 @@
 Bugs:
 ===========================================
  a) Somehow the first NIO connection made, always closes down, why
- b) State synchronization for the map
+ b) State synchronization for the map - will need to add in MSG_INIT
 
 Code Tasks:
 ===========================================



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to