Author: kfujino
Date: Thu Apr 23 08:51:18 2015
New Revision: 1675559

URL: http://svn.apache.org/r1675559
Log:
Backport r1653423, r1660266, r1671471.
- Clarify the handling of Copy message and Copy nodes.
- Make sure that add to the backup node of the map entry when map member has 
been added to ReplicatedMap.
- Avoid unnecessary call of DeltaRequest#addSessionListener in non-primary 
nodes.

Modified:
    tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java
    tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java
    
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
    
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
    tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java 
(original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java 
Thu Apr 23 08:51:18 2015
@@ -209,9 +209,9 @@ public class DeltaRequest implements Ext
                 case TYPE_LISTENER:
                     SessionListener listener = (SessionListener) 
info.getValue();
                     if (info.getAction() == ACTION_SET) {
-                        session.addSessionListener(listener);
+                        session.addSessionListener(listener,false);
                     } else {
-                        session.removeSessionListener(listener);
+                        session.removeSessionListener(listener,false);
                     }
                     break;
                 default :

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java 
(original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java 
Thu Apr 23 08:51:18 2015
@@ -515,10 +515,14 @@ public class DeltaSession extends Standa
 
     @Override
     public void addSessionListener(SessionListener listener) {
+        addSessionListener(listener, true);
+    }
+
+    public void addSessionListener(SessionListener listener, boolean 
addDeltaRequest) {
         lock();
         try {
             super.addSessionListener(listener);
-            if (deltaRequest != null && listener instanceof 
ReplicatedSessionListener) {
+            if (addDeltaRequest && deltaRequest != null && listener instanceof 
ReplicatedSessionListener) {
                 deltaRequest.addSessionListener(listener);
             }
         } finally {
@@ -528,10 +532,14 @@ public class DeltaSession extends Standa
 
     @Override
     public void removeSessionListener(SessionListener listener) {
+        removeSessionListener(listener, true);
+    }
+
+    public void removeSessionListener(SessionListener listener, boolean 
addDeltaRequest) {
         lock();
         try {
             super.removeSessionListener(listener);
-            if (deltaRequest != null && listener instanceof 
ReplicatedSessionListener) {
+            if (addDeltaRequest && deltaRequest != null && listener instanceof 
ReplicatedSessionListener) {
                 deltaRequest.removeSessionListener(listener);
             }
         } finally {

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 (original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
 Thu Apr 23 08:51:18 2015
@@ -79,10 +79,12 @@ public abstract class AbstractReplicated
 
//------------------------------------------------------------------------------
 //              INSTANCE VARIABLES
 
//------------------------------------------------------------------------------
-    private final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
+    protected final ConcurrentHashMap<K, MapEntry<K,V>> innerMap;
 
     protected abstract int getStateMessageType();
 
+    protected abstract int getReplicateMessageType();
+
 
     /**
      * Timeout for RPC messages, how long we will wait for a reply
@@ -433,7 +435,7 @@ public abstract class AbstractReplicated
                 rentry.lock();
                 try {
                     //construct a diff message
-                    msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+                    msg = new MapMessage(mapContextName, 
getReplicateMessageType(),
                                          true, (Serializable) entry.getKey(), 
null,
                                          rentry.getDiff(),
                                          entry.getPrimary(),
@@ -447,7 +449,7 @@ public abstract class AbstractReplicated
             }
             if (msg == null && complete) {
                 //construct a complete
-                msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP,
+                msg = new MapMessage(mapContextName, getReplicateMessageType(),
                                      false, (Serializable) entry.getKey(),
                                      (Serializable) entry.getValue(),
                                      null, 
entry.getPrimary(),entry.getBackupNodes());
@@ -639,6 +641,7 @@ public abstract class AbstractReplicated
             }
             entry.setProxy(true);
             entry.setBackup(false);
+            entry.setCopy(false);
             entry.setBackupNodes(mapmsg.getBackupNodes());
             entry.setPrimary(mapmsg.getPrimary());
         }
@@ -653,6 +656,7 @@ public abstract class AbstractReplicated
                 entry = new MapEntry<K,V>((K) mapmsg.getKey(), (V) 
mapmsg.getValue());
                 entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                 entry.setProxy(false);
+                entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
                 entry.setPrimary(mapmsg.getPrimary());
                 if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof 
ReplicatedMapEntry ) {
@@ -661,6 +665,7 @@ public abstract class AbstractReplicated
             } else {
                 entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
                 entry.setProxy(false);
+                entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY);
                 entry.setBackupNodes(mapmsg.getBackupNodes());
                 entry.setPrimary(mapmsg.getPrimary());
                 if (entry.getValue() instanceof ReplicatedMapEntry) {
@@ -708,6 +713,14 @@ public abstract class AbstractReplicated
                 }
             }
         }
+
+        if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) {
+            MapEntry<K, V> entry = innerMap.get(mapmsg.getKey());
+            if (entry != null) {
+                entry.setBackupNodes(mapmsg.getBackupNodes());
+                entry.setPrimary(mapmsg.getPrimary());
+            }
+        }
     }
 
     @Override
@@ -826,6 +839,7 @@ public abstract class AbstractReplicated
                     entry.setPrimary(channel.getLocalMember(false));
                     entry.setBackup(false);
                     entry.setProxy(false);
+                    entry.setCopy(false);
                     Member[] backup = publishEntryInfo(entry.getKey(), 
entry.getValue());
                     entry.setBackupNodes(backup);
                     if ( mapOwner!=null ) 
mapOwner.objectMadePrimay(entry.getKey(),entry.getValue());
@@ -912,7 +926,10 @@ public abstract class AbstractReplicated
             try {
                 Member[] backup = null;
                 MapMessage msg = null;
-                if ( !entry.isBackup() ) {
+                if (entry.isBackup()) {
+                    //select a new backup node
+                    backup = publishEntryInfo(key, entry.getValue());
+                } else if ( entry.isProxy() ) {
                     //make sure we don't retrieve from ourselves
                     msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_RETRIEVE_BACKUP, false,
                                          (Serializable) key, null, null, 
null,null);
@@ -925,31 +942,31 @@ public abstract class AbstractReplicated
                     msg = (MapMessage) resp[0].getMessage();
                     msg.deserialize(getExternalLoaders());
                     backup = entry.getBackupNodes();
-                    if ( entry.getValue() instanceof ReplicatedMapEntry ) {
-                        ReplicatedMapEntry val = 
(ReplicatedMapEntry)entry.getValue();
-                        val.setOwner(getMapOwner());
-                    }
                     if ( msg.getValue()!=null ) entry.setValue((V) 
msg.getValue());
-                }
-                if (entry.isBackup()) {
-                    //select a new backup node
-                    backup = publishEntryInfo(key, entry.getValue());
-                } else if ( entry.isProxy() ) {
+                    
                     //invalidate the previous primary
                     msg = new 
MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup);
                     Member[] dest = getMapMembersExcl(backup);
                     if ( dest!=null && dest.length >0) {
                         getChannel().send(dest, msg, getChannelSendOptions());
                     }
-                    if ( entry.getValue() != null && entry.getValue() 
instanceof ReplicatedMapEntry ) {
+                    if (entry.getValue() instanceof ReplicatedMapEntry) {
                         ReplicatedMapEntry val = 
(ReplicatedMapEntry)entry.getValue();
                         val.setOwner(getMapOwner());
                     }
+                } else if ( entry.isCopy() ) {
+                    backup = getMapMembers();
+                    if (backup.length > 0) {
+                        msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+                                
(Serializable)key,null,null,channel.getLocalMember(false),backup);
+                        getChannel().send(backup, msg, 
getChannelSendOptions());
+                    }
                 }
                 entry.setPrimary(channel.getLocalMember(false));
                 entry.setBackupNodes(backup);
                 entry.setBackup(false);
                 entry.setProxy(false);
+                entry.setCopy(false);
                 if ( getMapOwner()!=null ) getMapOwner().objectMadePrimay(key, 
entry.getValue());
 
             } catch (Exception x) {
@@ -1006,6 +1023,7 @@ public abstract class AbstractReplicated
         MapEntry<K,V> entry = new MapEntry<K,V>(key,value);
         entry.setBackup(false);
         entry.setProxy(false);
+        entry.setCopy(false);
         entry.setPrimary(channel.getLocalMember(false));
 
         V old = null;
@@ -1164,6 +1182,7 @@ public abstract class AbstractReplicated
     public static class MapEntry<K,V> implements Map.Entry<K,V> {
         private boolean backup;
         private boolean proxy;
+        private boolean copy;
         private Member[] backupNodes;
         private Member primary;
         private K key;
@@ -1200,7 +1219,7 @@ public abstract class AbstractReplicated
         }
 
         public boolean isPrimary() {
-            return (!proxy && !backup);
+            return (!proxy && !backup && !copy);
         }
 
         public boolean isActive() {
@@ -1211,6 +1230,14 @@ public abstract class AbstractReplicated
             this.proxy = proxy;
         }
 
+        public boolean isCopy() {
+            return copy;
+        }
+
+        public void setCopy(boolean copy) {
+            this.copy = copy;
+        }
+
         public boolean isDiffable() {
             return (value instanceof ReplicatedMapEntry) &&
                    ((ReplicatedMapEntry)value).isDiffable();
@@ -1322,6 +1349,7 @@ public abstract class AbstractReplicated
         public static final int MSG_COPY = 9;
         public static final int MSG_STATE_COPY = 10;
         public static final int MSG_ACCESS = 11;
+        public static final int MSG_NOTIFY_MAPMEMBER = 12;
 
         private byte[] mapId;
         private int msgtype;
@@ -1360,6 +1388,7 @@ public abstract class AbstractReplicated
                 case MSG_STATE_COPY: return "MSG_STATE_COPY";
                 case MSG_COPY: return "MSG_COPY";
                 case MSG_ACCESS: return "MSG_ACCESS";
+                case MSG_NOTIFY_MAPMEMBER: return "MSG_NOTIFY_MAPMEMBER";
                 default : return "UNKNOWN";
             }
         }

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 (original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
 Thu Apr 23 08:51:18 2015
@@ -125,6 +125,11 @@ public class LazyReplicatedMap<K,V> exte
         return AbstractReplicatedMap.MapMessage.MSG_STATE;
     }
 
+    @Override
+    protected int getReplicateMessageType() {
+        return AbstractReplicatedMap.MapMessage.MSG_BACKUP;
+    }
+
     /**
      * publish info about a map pair (key/value) to other nodes in the cluster
      * @param key Object

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java 
(original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java 
Thu Apr 23 08:51:18 2015
@@ -17,10 +17,14 @@
 package org.apache.catalina.tribes.tipis;
 
 import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.Member;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
 
 /**
  * All-to-all replication for a hash map implementation. Each node in the 
cluster will carry an identical 
@@ -50,6 +54,8 @@ public class ReplicatedMap<K,V> extends
 
     private static final long serialVersionUID = 1L;
 
+    private final Log log = LogFactory.getLog(ReplicatedMap.class);
+
     
//--------------------------------------------------------------------------
     //              CONSTRUCTORS / DESTRUCTORS
     
//--------------------------------------------------------------------------
@@ -105,7 +111,12 @@ public class ReplicatedMap<K,V> extends
     protected int getStateMessageType() {
         return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
     }
-    
+
+    @Override
+    protected int getReplicateMessageType() {
+        return AbstractReplicatedMap.MapMessage.MSG_COPY;
+    }
+
     /**
      * publish info about a map pair (key/value) to other nodes in the cluster
      * @param key Object
@@ -130,4 +141,94 @@ public class ReplicatedMap<K,V> extends
         return backup;
     }
 
+    @Override
+    public void memberDisappeared(Member member) {
+        boolean removed = false;
+        synchronized (mapMembers) {
+            removed = (mapMembers.remove(member) != null );
+            if (!removed) {
+                if (log.isDebugEnabled()) log.debug("Member["+member+"] 
disappeared, but was not present in the map.");
+                return; //the member was not part of our map.
+            }
+        }
+        if (log.isInfoEnabled())
+            log.info("Member["+member+"] disappeared. Related map entries will 
be relocated to the new node.");
+        long start = System.currentTimeMillis();
+        Iterator<Map.Entry<K,MapEntry<K,V>>> i = 
innerMap.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry<K,MapEntry<K,V>> e = i.next();
+            MapEntry<K,V> entry = innerMap.get(e.getKey());
+            if (entry==null) continue;
+            if (entry.isPrimary()) {
+                try {
+                    Member[] backup = getMapMembers();
+                    if (backup.length > 0) {
+                        MapMessage msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+                                
(Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+                        getChannel().send(backup, msg, 
getChannelSendOptions());
+                    }
+                    entry.setBackupNodes(backup);
+                    entry.setPrimary(channel.getLocalMember(false));
+                } catch (ChannelException x) {
+                    log.error("Unable to relocate[" + entry.getKey() + "] to a 
new backup node", x);
+                }
+            } else if (member.equals(entry.getPrimary())) {
+                entry.setPrimary(null);
+            }
+
+            if ( entry.getPrimary() == null &&
+                        entry.isCopy() &&
+                        entry.getBackupNodes()!=null &&
+                        entry.getBackupNodes().length > 0 &&
+                        
entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) {
+                try {
+                    entry.setPrimary(channel.getLocalMember(false));
+                    entry.setBackup(false);
+                    entry.setProxy(false);
+                    entry.setCopy(false);
+                    Member[] backup = getMapMembers();
+                    if (backup.length > 0) {
+                        MapMessage msg = new MapMessage(getMapContextName(), 
MapMessage.MSG_NOTIFY_MAPMEMBER,false,
+                                
(Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup);
+                        getChannel().send(backup, msg, 
getChannelSendOptions());
+                    }
+                    entry.setBackupNodes(backup);
+                    if ( mapOwner!=null ) 
mapOwner.objectMadePrimay(entry.getKey(),entry.getValue());
+
+                } catch (ChannelException x) {
+                    log.error("Unable to relocate[" + entry.getKey() + "] to a 
new backup node", x);
+                }
+            }
+
+        } //while
+        long complete = System.currentTimeMillis() - start;
+        if (log.isInfoEnabled()) log.info("Relocation of map entries was 
complete in " + complete + " ms.");
+    }
+
+    @Override
+    public void mapMemberAdded(Member member) {
+        if ( member.equals(getChannel().getLocalMember(false)) ) return;
+        boolean memberAdded = false;
+        synchronized (mapMembers) {
+            if (!mapMembers.containsKey(member) ) {
+                mapMembers.put(member, new Long(System.currentTimeMillis()));
+                memberAdded = true;
+            }
+        }
+        if ( memberAdded ) {
+            synchronized (stateMutex) {
+                Member[] backup = getMapMembers();
+                Iterator<Map.Entry<K,MapEntry<K,V>>> i = 
innerMap.entrySet().iterator();
+                while (i.hasNext()) {
+                    Map.Entry<K,MapEntry<K,V>> e = i.next();
+                    MapEntry<K,V> entry = innerMap.get(e.getKey());
+                    if ( entry == null ) continue;
+                    if (entry.isPrimary() && 
!inSet(member,entry.getBackupNodes())) {
+                        entry.setBackupNodes(backup);
+                    }
+                }
+            }
+        }
+    }
+
 }
\ No newline at end of file

Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1675559&r1=1675558&r2=1675559&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Thu Apr 23 08:51:18 2015
@@ -136,6 +136,10 @@
         to handle nodes being added and removed from the Cluster at run time.
         (markt) 
       </fix>
+      <fix>
+        Avoid unnecessary call of 
<code>DeltaRequest.addSessionListener()</code>
+        in non-primary nodes. (kfujino)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="WebSocket">
@@ -179,6 +183,26 @@
         This fix ensures that <code>MapOwner</code> is set to
         <code>ReplicatedMapEntry</code>. (kfujino)
       </fix>
+      <fix>
+        Clarify the handling of Copy message and Copy nodes. (kfujino)
+      </fix>
+      <fix>
+        Copy node does not need to send the entry data. It is enough to send
+        only the node information of the entry. (kfujino)
+      </fix>
+      <fix>
+        <code>ReplicatedMap</code> should send the Copy message when
+        replicating. (kfujino)
+      </fix>
+      <fix>
+        Fix behavior of <code>ReplicatedMap</code> when member has disappeared.
+        If map entry is primary, rebuild the backup members. If primary node of
+        map entry has disappeared, backup node is promoted to primary. 
(kfujino)
+      </fix>
+      <fix>
+        When a map member has been added to <code>ReplicatedMap</code>, make
+        sure to add it to backup nodes list of all other members. (kfujino)
+      </fix>
     </changelog>
   </subsection>
 </section>



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to