Author: kfujino Date: Thu Apr 23 08:51:18 2015 New Revision: 1675559 URL: http://svn.apache.org/r1675559 Log: Backport r1653423, r1660266, r1671471. - Clarify the handling of Copy message and Copy nodes. - Make sure that add to the backup node of the map entry when map member has been added to ReplicatedMap. - Avoid unnecessary call of DeltaRequest#addSessionListener in non-primary nodes.
Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java?rev=1675559&r1=1675558&r2=1675559&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaRequest.java Thu Apr 23 08:51:18 2015 @@ -209,9 +209,9 @@ public class DeltaRequest implements Ext case TYPE_LISTENER: SessionListener listener = (SessionListener) info.getValue(); if (info.getAction() == ACTION_SET) { - session.addSessionListener(listener); + session.addSessionListener(listener,false); } else { - session.removeSessionListener(listener); + session.removeSessionListener(listener,false); } break; default : Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java?rev=1675559&r1=1675558&r2=1675559&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaSession.java Thu Apr 23 08:51:18 2015 @@ -515,10 +515,14 @@ public class DeltaSession extends Standa @Override public void addSessionListener(SessionListener listener) { + addSessionListener(listener, true); + } + + public void addSessionListener(SessionListener listener, boolean addDeltaRequest) { lock(); try { super.addSessionListener(listener); - if (deltaRequest != null && listener instanceof ReplicatedSessionListener) { + if (addDeltaRequest && deltaRequest != null && listener instanceof ReplicatedSessionListener) { deltaRequest.addSessionListener(listener); } } finally { @@ -528,10 +532,14 @@ public class DeltaSession extends Standa @Override public void removeSessionListener(SessionListener listener) { + removeSessionListener(listener, true); + } + + public void removeSessionListener(SessionListener listener, boolean addDeltaRequest) { lock(); try { super.removeSessionListener(listener); - if (deltaRequest != null && listener instanceof ReplicatedSessionListener) { + if (addDeltaRequest && deltaRequest != null && listener instanceof ReplicatedSessionListener) { deltaRequest.removeSessionListener(listener); } } finally { Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Thu Apr 23 08:51:18 2015 @@ -79,10 +79,12 @@ public abstract class AbstractReplicated //------------------------------------------------------------------------------ // INSTANCE VARIABLES //------------------------------------------------------------------------------ - private final ConcurrentHashMap<K, MapEntry<K,V>> innerMap; + protected final ConcurrentHashMap<K, MapEntry<K,V>> innerMap; protected abstract int getStateMessageType(); + protected abstract int getReplicateMessageType(); + /** * Timeout for RPC messages, how long we will wait for a reply @@ -433,7 +435,7 @@ public abstract class AbstractReplicated rentry.lock(); try { //construct a diff message - msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, + msg = new MapMessage(mapContextName, getReplicateMessageType(), true, (Serializable) entry.getKey(), null, rentry.getDiff(), entry.getPrimary(), @@ -447,7 +449,7 @@ public abstract class AbstractReplicated } if (msg == null && complete) { //construct a complete - msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, + msg = new MapMessage(mapContextName, getReplicateMessageType(), false, (Serializable) entry.getKey(), (Serializable) entry.getValue(), null, entry.getPrimary(),entry.getBackupNodes()); @@ -639,6 +641,7 @@ public abstract class AbstractReplicated } entry.setProxy(true); entry.setBackup(false); + entry.setCopy(false); entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); } @@ -653,6 +656,7 @@ public abstract class AbstractReplicated entry = new MapEntry<K,V>((K) mapmsg.getKey(), (V) mapmsg.getValue()); entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); entry.setProxy(false); + entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY); entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) { @@ -661,6 +665,7 @@ public abstract class AbstractReplicated } else { entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); entry.setProxy(false); + entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY); entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); if (entry.getValue() instanceof ReplicatedMapEntry) { @@ -708,6 +713,14 @@ public abstract class AbstractReplicated } } } + + if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) { + MapEntry<K, V> entry = innerMap.get(mapmsg.getKey()); + if (entry != null) { + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + } + } } @Override @@ -826,6 +839,7 @@ public abstract class AbstractReplicated entry.setPrimary(channel.getLocalMember(false)); entry.setBackup(false); entry.setProxy(false); + entry.setCopy(false); Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); if ( mapOwner!=null ) mapOwner.objectMadePrimay(entry.getKey(),entry.getValue()); @@ -912,7 +926,10 @@ public abstract class AbstractReplicated try { Member[] backup = null; MapMessage msg = null; - if ( !entry.isBackup() ) { + if (entry.isBackup()) { + //select a new backup node + backup = publishEntryInfo(key, entry.getValue()); + } else if ( entry.isProxy() ) { //make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, null, null, null,null); @@ -925,31 +942,31 @@ public abstract class AbstractReplicated msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); backup = entry.getBackupNodes(); - if ( entry.getValue() instanceof ReplicatedMapEntry ) { - ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); - val.setOwner(getMapOwner()); - } if ( msg.getValue()!=null ) entry.setValue((V) msg.getValue()); - } - if (entry.isBackup()) { - //select a new backup node - backup = publishEntryInfo(key, entry.getValue()); - } else if ( entry.isProxy() ) { + //invalidate the previous primary msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup); Member[] dest = getMapMembersExcl(backup); if ( dest!=null && dest.length >0) { getChannel().send(dest, msg, getChannelSendOptions()); } - if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) { + if (entry.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); val.setOwner(getMapOwner()); } + } else if ( entry.isCopy() ) { + backup = getMapMembers(); + if (backup.length > 0) { + msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, + (Serializable)key,null,null,channel.getLocalMember(false),backup); + getChannel().send(backup, msg, getChannelSendOptions()); + } } entry.setPrimary(channel.getLocalMember(false)); entry.setBackupNodes(backup); entry.setBackup(false); entry.setProxy(false); + entry.setCopy(false); if ( getMapOwner()!=null ) getMapOwner().objectMadePrimay(key, entry.getValue()); } catch (Exception x) { @@ -1006,6 +1023,7 @@ public abstract class AbstractReplicated MapEntry<K,V> entry = new MapEntry<K,V>(key,value); entry.setBackup(false); entry.setProxy(false); + entry.setCopy(false); entry.setPrimary(channel.getLocalMember(false)); V old = null; @@ -1164,6 +1182,7 @@ public abstract class AbstractReplicated public static class MapEntry<K,V> implements Map.Entry<K,V> { private boolean backup; private boolean proxy; + private boolean copy; private Member[] backupNodes; private Member primary; private K key; @@ -1200,7 +1219,7 @@ public abstract class AbstractReplicated } public boolean isPrimary() { - return (!proxy && !backup); + return (!proxy && !backup && !copy); } public boolean isActive() { @@ -1211,6 +1230,14 @@ public abstract class AbstractReplicated this.proxy = proxy; } + public boolean isCopy() { + return copy; + } + + public void setCopy(boolean copy) { + this.copy = copy; + } + public boolean isDiffable() { return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable(); @@ -1322,6 +1349,7 @@ public abstract class AbstractReplicated public static final int MSG_COPY = 9; public static final int MSG_STATE_COPY = 10; public static final int MSG_ACCESS = 11; + public static final int MSG_NOTIFY_MAPMEMBER = 12; private byte[] mapId; private int msgtype; @@ -1360,6 +1388,7 @@ public abstract class AbstractReplicated case MSG_STATE_COPY: return "MSG_STATE_COPY"; case MSG_COPY: return "MSG_COPY"; case MSG_ACCESS: return "MSG_ACCESS"; + case MSG_NOTIFY_MAPMEMBER: return "MSG_NOTIFY_MAPMEMBER"; default : return "UNKNOWN"; } } Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Thu Apr 23 08:51:18 2015 @@ -125,6 +125,11 @@ public class LazyReplicatedMap<K,V> exte return AbstractReplicatedMap.MapMessage.MSG_STATE; } + @Override + protected int getReplicateMessageType() { + return AbstractReplicatedMap.MapMessage.MSG_BACKUP; + } + /** * publish info about a map pair (key/value) to other nodes in the cluster * @param key Object Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=1675559&r1=1675558&r2=1675559&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Thu Apr 23 08:51:18 2015 @@ -17,10 +17,14 @@ package org.apache.catalina.tribes.tipis; import java.io.Serializable; +import java.util.Iterator; +import java.util.Map; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.Member; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; /** * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical @@ -50,6 +54,8 @@ public class ReplicatedMap<K,V> extends private static final long serialVersionUID = 1L; + private final Log log = LogFactory.getLog(ReplicatedMap.class); + //-------------------------------------------------------------------------- // CONSTRUCTORS / DESTRUCTORS //-------------------------------------------------------------------------- @@ -105,7 +111,12 @@ public class ReplicatedMap<K,V> extends protected int getStateMessageType() { return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY; } - + + @Override + protected int getReplicateMessageType() { + return AbstractReplicatedMap.MapMessage.MSG_COPY; + } + /** * publish info about a map pair (key/value) to other nodes in the cluster * @param key Object @@ -130,4 +141,94 @@ public class ReplicatedMap<K,V> extends return backup; } + @Override + public void memberDisappeared(Member member) { + boolean removed = false; + synchronized (mapMembers) { + removed = (mapMembers.remove(member) != null ); + if (!removed) { + if (log.isDebugEnabled()) log.debug("Member["+member+"] disappeared, but was not present in the map."); + return; //the member was not part of our map. + } + } + if (log.isInfoEnabled()) + log.info("Member["+member+"] disappeared. Related map entries will be relocated to the new node."); + long start = System.currentTimeMillis(); + Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<K,MapEntry<K,V>> e = i.next(); + MapEntry<K,V> entry = innerMap.get(e.getKey()); + if (entry==null) continue; + if (entry.isPrimary()) { + try { + Member[] backup = getMapMembers(); + if (backup.length > 0) { + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, + (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup); + getChannel().send(backup, msg, getChannelSendOptions()); + } + entry.setBackupNodes(backup); + entry.setPrimary(channel.getLocalMember(false)); + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } else if (member.equals(entry.getPrimary())) { + entry.setPrimary(null); + } + + if ( entry.getPrimary() == null && + entry.isCopy() && + entry.getBackupNodes()!=null && + entry.getBackupNodes().length > 0 && + entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) { + try { + entry.setPrimary(channel.getLocalMember(false)); + entry.setBackup(false); + entry.setProxy(false); + entry.setCopy(false); + Member[] backup = getMapMembers(); + if (backup.length > 0) { + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, + (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup); + getChannel().send(backup, msg, getChannelSendOptions()); + } + entry.setBackupNodes(backup); + if ( mapOwner!=null ) mapOwner.objectMadePrimay(entry.getKey(),entry.getValue()); + + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } + + } //while + long complete = System.currentTimeMillis() - start; + if (log.isInfoEnabled()) log.info("Relocation of map entries was complete in " + complete + " ms."); + } + + @Override + public void mapMemberAdded(Member member) { + if ( member.equals(getChannel().getLocalMember(false)) ) return; + boolean memberAdded = false; + synchronized (mapMembers) { + if (!mapMembers.containsKey(member) ) { + mapMembers.put(member, new Long(System.currentTimeMillis())); + memberAdded = true; + } + } + if ( memberAdded ) { + synchronized (stateMutex) { + Member[] backup = getMapMembers(); + Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<K,MapEntry<K,V>> e = i.next(); + MapEntry<K,V> entry = innerMap.get(e.getKey()); + if ( entry == null ) continue; + if (entry.isPrimary() && !inSet(member,entry.getBackupNodes())) { + entry.setBackupNodes(backup); + } + } + } + } + } + } \ No newline at end of file Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1675559&r1=1675558&r2=1675559&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Thu Apr 23 08:51:18 2015 @@ -136,6 +136,10 @@ to handle nodes being added and removed from the Cluster at run time. (markt) </fix> + <fix> + Avoid unnecessary call of <code>DeltaRequest.addSessionListener()</code> + in non-primary nodes. (kfujino) + </fix> </changelog> </subsection> <subsection name="WebSocket"> @@ -179,6 +183,26 @@ This fix ensures that <code>MapOwner</code> is set to <code>ReplicatedMapEntry</code>. (kfujino) </fix> + <fix> + Clarify the handling of Copy message and Copy nodes. (kfujino) + </fix> + <fix> + Copy node does not need to send the entry data. It is enough to send + only the node information of the entry. (kfujino) + </fix> + <fix> + <code>ReplicatedMap</code> should send the Copy message when + replicating. (kfujino) + </fix> + <fix> + Fix behavior of <code>ReplicatedMap</code> when member has disappeared. + If map entry is primary, rebuild the backup members. If primary node of + map entry has disappeared, backup node is promoted to primary. (kfujino) + </fix> + <fix> + When a map member has been added to <code>ReplicatedMap</code>, make + sure to add it to backup nodes list of all other members. (kfujino) + </fix> </changelog> </subsection> </section> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org