Author: kfujino Date: Wed Jan 21 02:37:15 2015 New Revision: 1653423 URL: http://svn.apache.org/r1653423 Log: Backport. Clarify the handling of Copy message and Copy nodes.
Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff ============================================================================== --- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (original) +++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java Wed Jan 21 02:37:15 2015 @@ -71,10 +71,12 @@ public abstract class AbstractReplicated //------------------------------------------------------------------------------ // INSTANCE VARIABLES //------------------------------------------------------------------------------ - private final ConcurrentHashMap<K, MapEntry<K,V>> innerMap; + protected final ConcurrentHashMap<K, MapEntry<K,V>> innerMap; protected abstract int getStateMessageType(); + protected abstract int getReplicateMessageType(); + /** * Timeout for RPC messages, how long we will wait for a reply @@ -426,7 +428,7 @@ public abstract class AbstractReplicated rentry.lock(); try { //construct a diff message - msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, + msg = new MapMessage(mapContextName, getReplicateMessageType(), true, (Serializable) entry.getKey(), null, rentry.getDiff(), entry.getPrimary(), @@ -440,7 +442,7 @@ public abstract class AbstractReplicated } if (msg == null && complete) { //construct a complete - msg = new MapMessage(mapContextName, MapMessage.MSG_BACKUP, + msg = new MapMessage(mapContextName, getReplicateMessageType(), false, (Serializable) entry.getKey(), (Serializable) entry.getValue(), null, entry.getPrimary(),entry.getBackupNodes()); @@ -632,6 +634,7 @@ public abstract class AbstractReplicated } entry.setProxy(true); entry.setBackup(false); + entry.setCopy(false); entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); } @@ -646,6 +649,7 @@ public abstract class AbstractReplicated entry = new MapEntry<>((K) mapmsg.getKey(), (V) mapmsg.getValue()); entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); entry.setProxy(false); + entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY); entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); if (mapmsg.getValue()!=null && mapmsg.getValue() instanceof ReplicatedMapEntry ) { @@ -654,6 +658,7 @@ public abstract class AbstractReplicated } else { entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); entry.setProxy(false); + entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY); entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); if (entry.getValue() instanceof ReplicatedMapEntry) { @@ -692,6 +697,14 @@ public abstract class AbstractReplicated } } } + + if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) { + MapEntry<K, V> entry = innerMap.get(mapmsg.getKey()); + if (entry != null) { + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + } + } } @Override @@ -810,6 +823,7 @@ public abstract class AbstractReplicated entry.setPrimary(channel.getLocalMember(false)); entry.setBackup(false); entry.setProxy(false); + entry.setCopy(false); Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); if ( mapOwner!=null ) mapOwner.objectMadePrimary(entry.getKey(),entry.getValue()); @@ -896,7 +910,10 @@ public abstract class AbstractReplicated try { Member[] backup = null; MapMessage msg = null; - if ( !entry.isBackup() ) { + if (entry.isBackup()) { + //select a new backup node + backup = publishEntryInfo(key, entry.getValue()); + } else if ( entry.isProxy() ) { //make sure we don't retrieve from ourselves msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, null, null, null,null); @@ -909,31 +926,31 @@ public abstract class AbstractReplicated msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); backup = entry.getBackupNodes(); - if ( entry.getValue() instanceof ReplicatedMapEntry ) { - ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); - val.setOwner(getMapOwner()); - } if ( msg.getValue()!=null ) entry.setValue((V) msg.getValue()); - } - if (entry.isBackup()) { - //select a new backup node - backup = publishEntryInfo(key, entry.getValue()); - } else if ( entry.isProxy() ) { + //invalidate the previous primary msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup); Member[] dest = getMapMembersExcl(backup); if ( dest!=null && dest.length >0) { getChannel().send(dest, msg, getChannelSendOptions()); } - if ( entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry ) { + if (entry.getValue() instanceof ReplicatedMapEntry) { ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); val.setOwner(getMapOwner()); } + } else if ( entry.isCopy() ) { + backup = getMapMembers(); + if (backup.length > 0) { + msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, + (Serializable)key,null,null,channel.getLocalMember(false),backup); + getChannel().send(backup, msg, getChannelSendOptions()); + } } entry.setPrimary(channel.getLocalMember(false)); entry.setBackupNodes(backup); entry.setBackup(false); entry.setProxy(false); + entry.setCopy(false); if ( getMapOwner()!=null ) getMapOwner().objectMadePrimary(key, entry.getValue()); } catch (Exception x) { @@ -990,6 +1007,7 @@ public abstract class AbstractReplicated MapEntry<K,V> entry = new MapEntry<>(key, value); entry.setBackup(false); entry.setProxy(false); + entry.setCopy(false); entry.setPrimary(channel.getLocalMember(false)); V old = null; @@ -1148,6 +1166,7 @@ public abstract class AbstractReplicated public static class MapEntry<K,V> implements Map.Entry<K,V> { private boolean backup; private boolean proxy; + private boolean copy; private Member[] backupNodes; private Member primary; private K key; @@ -1184,7 +1203,7 @@ public abstract class AbstractReplicated } public boolean isPrimary() { - return (!proxy && !backup); + return (!proxy && !backup && !copy); } public boolean isActive() { @@ -1195,6 +1214,14 @@ public abstract class AbstractReplicated this.proxy = proxy; } + public boolean isCopy() { + return copy; + } + + public void setCopy(boolean copy) { + this.copy = copy; + } + public boolean isDiffable() { return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable(); @@ -1306,6 +1333,7 @@ public abstract class AbstractReplicated public static final int MSG_COPY = 9; public static final int MSG_STATE_COPY = 10; public static final int MSG_ACCESS = 11; + public static final int MSG_NOTIFY_MAPMEMBER = 12; private final byte[] mapId; private final int msgtype; @@ -1344,6 +1372,7 @@ public abstract class AbstractReplicated case MSG_STATE_COPY: return "MSG_STATE_COPY"; case MSG_COPY: return "MSG_COPY"; case MSG_ACCESS: return "MSG_ACCESS"; + case MSG_NOTIFY_MAPMEMBER: return "MSG_NOTIFY_MAPMEMBER"; default : return "UNKNOWN"; } } Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff ============================================================================== --- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Wed Jan 21 02:37:15 2015 @@ -124,6 +124,11 @@ public class LazyReplicatedMap<K,V> exte return AbstractReplicatedMap.MapMessage.MSG_STATE; } + @Override + protected int getReplicateMessageType() { + return AbstractReplicatedMap.MapMessage.MSG_BACKUP; + } + /** * publish info about a map pair (key/value) to other nodes in the cluster * @param key Object Modified: tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java URL: http://svn.apache.org/viewvc/tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=1653423&r1=1653422&r2=1653423&view=diff ============================================================================== --- tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java (original) +++ tomcat/tc8.0.x/trunk/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java Wed Jan 21 02:37:15 2015 @@ -17,10 +17,14 @@ package org.apache.catalina.tribes.tipis; import java.io.Serializable; +import java.util.Iterator; +import java.util.Map; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.Member; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; /** * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical @@ -49,6 +53,8 @@ public class ReplicatedMap<K,V> extends private static final long serialVersionUID = 1L; + private final Log log = LogFactory.getLog(ReplicatedMap.class); + //-------------------------------------------------------------------------- // CONSTRUCTORS / DESTRUCTORS //-------------------------------------------------------------------------- @@ -105,6 +111,11 @@ public class ReplicatedMap<K,V> extends return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY; } + @Override + protected int getReplicateMessageType() { + return AbstractReplicatedMap.MapMessage.MSG_COPY; + } + /** * publish info about a map pair (key/value) to other nodes in the cluster * @param key Object @@ -129,4 +140,67 @@ public class ReplicatedMap<K,V> extends return backup; } + @Override + public void memberDisappeared(Member member) { + boolean removed = false; + synchronized (mapMembers) { + removed = (mapMembers.remove(member) != null ); + if (!removed) { + if (log.isDebugEnabled()) log.debug("Member["+member+"] disappeared, but was not present in the map."); + return; //the member was not part of our map. + } + } + if (log.isInfoEnabled()) + log.info("Member["+member+"] disappeared. Related map entries will be relocated to the new node."); + long start = System.currentTimeMillis(); + Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<K,MapEntry<K,V>> e = i.next(); + MapEntry<K,V> entry = innerMap.get(e.getKey()); + if (entry==null) continue; + if (entry.isPrimary()) { + try { + Member[] backup = getMapMembers(); + if (backup.length > 0) { + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, + (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup); + getChannel().send(backup, msg, getChannelSendOptions()); + } + entry.setBackupNodes(backup); + entry.setPrimary(channel.getLocalMember(false)); + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } else if (member.equals(entry.getPrimary())) { + entry.setPrimary(null); + } + + if ( entry.getPrimary() == null && + entry.isCopy() && + entry.getBackupNodes()!=null && + entry.getBackupNodes().length > 0 && + entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) { + try { + entry.setPrimary(channel.getLocalMember(false)); + entry.setBackup(false); + entry.setProxy(false); + entry.setCopy(false); + Member[] backup = getMapMembers(); + if (backup.length > 0) { + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, + (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup); + getChannel().send(backup, msg, getChannelSendOptions()); + } + entry.setBackupNodes(backup); + if ( mapOwner!=null ) mapOwner.objectMadePrimary(entry.getKey(),entry.getValue()); + + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } + + } //while + long complete = System.currentTimeMillis() - start; + if (log.isInfoEnabled()) log.info("Relocation of map entries was complete in " + complete + " ms."); + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org