This is an automated email from the ASF dual-hosted git repository. markt pushed a commit to branch 9.0.x in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/9.0.x by this push: new bbaee85a52 Code clean-up - no functional change bbaee85a52 is described below commit bbaee85a525d53d6366acc498f845c00ed4fa230 Author: Mark Thomas <ma...@apache.org> AuthorDate: Fri May 10 14:40:52 2024 +0100 Code clean-up - no functional change --- .../apache/catalina/tribes/jmx/JmxRegistry.java | 8 +- .../tribes/tipis/AbstractReplicatedMap.java | 690 ++++++++++----------- .../catalina/tribes/tipis/LazyReplicatedMap.java | 170 ++--- .../catalina/tribes/tipis/ReplicatedMap.java | 157 ++--- .../catalina/tribes/tipis/ReplicatedMapEntry.java | 34 +- 5 files changed, 539 insertions(+), 520 deletions(-) diff --git a/java/org/apache/catalina/tribes/jmx/JmxRegistry.java b/java/org/apache/catalina/tribes/jmx/JmxRegistry.java index a22904d963..a833597fbe 100644 --- a/java/org/apache/catalina/tribes/jmx/JmxRegistry.java +++ b/java/org/apache/catalina/tribes/jmx/JmxRegistry.java @@ -36,7 +36,7 @@ public class JmxRegistry { private static final Log log = LogFactory.getLog(JmxRegistry.class); protected static final StringManager sm = StringManager.getManager(JmxRegistry.class); - private static ConcurrentHashMap<String, JmxRegistry> registryCache = new ConcurrentHashMap<>(); + private static ConcurrentHashMap<String,JmxRegistry> registryCache = new ConcurrentHashMap<>(); private MBeanServer mbserver = ManagementFactory.getPlatformMBeanServer(); private ObjectName baseOname = null; @@ -60,8 +60,8 @@ public class JmxRegistry { if (!jmxChannel.isJmxEnabled()) { return null; } - ObjectName baseOn = createBaseObjectName(jmxChannel.getJmxDomain(), - jmxChannel.getJmxPrefix(), channel.getName()); + ObjectName baseOn = + createBaseObjectName(jmxChannel.getJmxDomain(), jmxChannel.getJmxPrefix(), channel.getName()); if (baseOn == null) { return null; } @@ -131,7 +131,7 @@ public class JmxRegistry { } public void unregisterJmx(ObjectName oname) { - if (oname ==null) { + if (oname == null) { return; } try { diff --git a/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java b/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java index 01b133d660..350d2436ac 100644 --- a/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java +++ b/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java @@ -52,9 +52,8 @@ import org.apache.juli.logging.LogFactory; * @param <K> The type of Key * @param <V> The type of Value */ -public abstract class AbstractReplicatedMap<K,V> - implements Map<K,V>, Serializable, RpcCallback, ChannelListener, - MembershipListener, Heartbeat { +public abstract class AbstractReplicatedMap<K, V> + implements Map<K,V>, Serializable, RpcCallback, ChannelListener, MembershipListener, Heartbeat { private static final long serialVersionUID = 1L; @@ -73,10 +72,10 @@ public abstract class AbstractReplicatedMap<K,V> public static final float DEFAULT_LOAD_FACTOR = 0.75f; -//------------------------------------------------------------------------------ -// INSTANCE VARIABLES -//------------------------------------------------------------------------------ - protected final ConcurrentMap<K, MapEntry<K,V>> innerMap; + // ------------------------------------------------------------------------------ + // INSTANCE VARIABLES + // ------------------------------------------------------------------------------ + protected final ConcurrentMap<K,MapEntry<K,V>> innerMap; protected abstract int getStateMessageType(); @@ -96,9 +95,7 @@ public abstract class AbstractReplicatedMap<K,V> */ protected transient RpcChannel rpcChannel; /** - * The Map context name makes this map unique, this - * allows us to have more than one map shared - * through one channel + * The Map context name makes this map unique, this allows us to have more than one map shared through one channel */ protected transient byte[] mapContextName; /** @@ -112,7 +109,7 @@ public abstract class AbstractReplicatedMap<K,V> /** * A list of members in our map */ - protected final transient HashMap<Member, Long> mapMembers = new HashMap<>(); + protected final transient HashMap<Member,Long> mapMembers = new HashMap<>(); /** * Our default send options */ @@ -127,16 +124,13 @@ public abstract class AbstractReplicatedMap<K,V> protected transient ClassLoader[] externalLoaders; /** - * The node we are currently backing up data to, this index will rotate - * on a round robin basis + * The node we are currently backing up data to, this index will rotate on a round robin basis */ protected transient int currentNode = 0; /** - * Since the map keeps internal membership - * this is the timeout for a ping message to be responded to - * If a remote map doesn't respond within this timeframe, - * its considered dead. + * Since the map keeps internal membership this is the timeout for a ping message to be responded to If a remote map + * doesn't respond within this timeframe, its considered dead. */ protected transient long accessTimeout = 5000; @@ -150,39 +144,33 @@ public abstract class AbstractReplicatedMap<K,V> */ private transient volatile State state = State.NEW; -//------------------------------------------------------------------------------ -// map owner interface -//------------------------------------------------------------------------------ + // ------------------------------------------------------------------------------ + // map owner interface + // ------------------------------------------------------------------------------ public interface MapOwner { void objectMadePrimary(Object key, Object value); } -//------------------------------------------------------------------------------ -// CONSTRUCTORS -//------------------------------------------------------------------------------ + // ------------------------------------------------------------------------------ + // CONSTRUCTORS + // ------------------------------------------------------------------------------ /** * Creates a new map. - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param initialCapacity int - the size of this map, see HashMap - * @param loadFactor float - load factor, see HashMap + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + * @param loadFactor float - load factor, see HashMap * @param channelSendOptions Send options - * @param cls - a list of classloaders to be used for deserialization of objects. - * @param terminate - Flag for whether to terminate this map that failed to start. + * @param cls - a list of classloaders to be used for deserialization of objects. + * @param terminate - Flag for whether to terminate this map that failed to start. */ - public AbstractReplicatedMap(MapOwner owner, - Channel channel, - long timeout, - String mapContextName, - int initialCapacity, - float loadFactor, - int channelSendOptions, - ClassLoader[] cls, - boolean terminate) { + public AbstractReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, + int initialCapacity, float loadFactor, int channelSendOptions, ClassLoader[] cls, boolean terminate) { innerMap = new ConcurrentHashMap<>(initialCapacity, loadFactor, 15); init(owner, channel, mapContextName, timeout, channelSendOptions, cls, terminate); @@ -190,30 +178,33 @@ public abstract class AbstractReplicatedMap<K,V> /** * Helper methods, wraps a single member in an array + * * @param m Member + * * @return Member[] */ protected Member[] wrap(Member m) { - if ( m == null ) { + if (m == null) { return new Member[0]; } else { - return new Member[] {m}; + return new Member[] { m }; } } /** - * Initializes the map by creating the RPC channel, registering itself as a channel listener - * This method is also responsible for initiating the state transfer - * @param owner Object - * @param channel Channel - * @param mapContextName String - * @param timeout long + * Initializes the map by creating the RPC channel, registering itself as a channel listener This method is also + * responsible for initiating the state transfer + * + * @param owner Object + * @param channel Channel + * @param mapContextName String + * @param timeout long * @param channelSendOptions int - * @param cls ClassLoader[] - * @param terminate - Flag for whether to terminate this map that failed to start. + * @param cls ClassLoader[] + * @param terminate - Flag for whether to terminate this map that failed to start. */ - protected void init(MapOwner owner, Channel channel, String mapContextName, - long timeout, int channelSendOptions,ClassLoader[] cls, boolean terminate) { + protected void init(MapOwner owner, Channel channel, String mapContextName, long timeout, int channelSendOptions, + ClassLoader[] cls, boolean terminate) { long start = System.currentTimeMillis(); if (log.isInfoEnabled()) { log.info(sm.getString("abstractReplicatedMap.init.start", mapContextName)); @@ -225,64 +216,57 @@ public abstract class AbstractReplicatedMap<K,V> this.rpcTimeout = timeout; this.mapname = mapContextName; - //unique context is more efficient if it is stored as bytes + // unique context is more efficient if it is stored as bytes this.mapContextName = mapContextName.getBytes(StandardCharsets.ISO_8859_1); - if ( log.isTraceEnabled() ) { - log.trace("Created Lazy Map with name:"+mapContextName+", bytes:"+Arrays.toString(this.mapContextName)); + if (log.isTraceEnabled()) { + log.trace( + "Created Lazy Map with name:" + mapContextName + ", bytes:" + Arrays.toString(this.mapContextName)); } - //create an rpc channel and add the map as a listener + // create an rpc channel and add the map as a listener this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); - //add this map as a message listener + // add this map as a message listener this.channel.addChannelListener(this); - //listen for membership notifications + // listen for membership notifications this.channel.addMembershipListener(this); try { - //broadcast our map, this just notifies other members of our existence + // broadcast our map, this just notifies other members of our existence broadcast(MapMessage.MSG_INIT, true); - //transfer state from another map + // transfer state from another map transferState(); - //state is transferred, we are ready for messaging + // state is transferred, we are ready for messaging broadcast(MapMessage.MSG_START, true); } catch (ChannelException x) { log.warn(sm.getString("abstractReplicatedMap.unableSend.startMessage")); if (terminate) { breakdown(); - throw new RuntimeException(sm.getString("abstractReplicatedMap.unableStart"),x); + throw new RuntimeException(sm.getString("abstractReplicatedMap.unableStart"), x); } } this.state = State.INITIALIZED; long complete = System.currentTimeMillis() - start; if (log.isInfoEnabled()) { - log.info(sm.getString("abstractReplicatedMap.init.completed", - mapContextName, Long.toString(complete))); + log.info(sm.getString("abstractReplicatedMap.init.completed", mapContextName, Long.toString(complete))); } } /** - * Sends a ping out to all the members in the cluster, not just map members - * that this map is alive. + * Sends a ping out to all the members in the cluster, not just map members that this map is alive. + * * @param timeout long + * * @throws ChannelException Send error */ protected void ping(long timeout) throws ChannelException { - MapMessage msg = new MapMessage(this.mapContextName, - MapMessage.MSG_PING, - false, - null, - null, - null, - channel.getLocalMember(false), - null); - if ( channel.getMembers().length > 0 ) { + MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_PING, false, null, null, null, + channel.getLocalMember(false), null); + if (channel.getMembers().length > 0) { try { - //send a ping, wait for all nodes to reply - Response[] resp = rpcChannel.send(channel.getMembers(), - msg, RpcChannel.ALL_REPLY, - (channelSendOptions), - (int) accessTimeout); + // send a ping, wait for all nodes to reply + Response[] resp = rpcChannel.send(channel.getMembers(), msg, RpcChannel.ALL_REPLY, (channelSendOptions), + (int) accessTimeout); for (Response response : resp) { MapMessage mapMsg = (MapMessage) response.getMessage(); try { @@ -294,8 +278,7 @@ public abstract class AbstractReplicatedMap<K,V> } else if (state == State.STATETRANSFERRED) { synchronized (mapMembers) { if (log.isInfoEnabled()) { - log.info(sm.getString("abstractReplicatedMap.ping.stateTransferredMember", - member)); + log.info(sm.getString("abstractReplicatedMap.ping.stateTransferredMember", member)); } if (mapMembers.containsKey(member)) { mapMembers.put(member, Long.valueOf(System.currentTimeMillis())); @@ -303,8 +286,7 @@ public abstract class AbstractReplicatedMap<K,V> } } else { if (log.isInfoEnabled()) { - log.info(sm.getString("abstractReplicatedMap.mapMember.unavailable", - member)); + log.info(sm.getString("abstractReplicatedMap.mapMember.unavailable", member)); } } } catch (ClassNotFoundException | IOException e) { @@ -320,22 +302,23 @@ public abstract class AbstractReplicatedMap<K,V> throw ce; } } - //update our map of members, expire some if we didn't receive a ping back + // update our map of members, expire some if we didn't receive a ping back synchronized (mapMembers) { Member[] members = mapMembers.keySet().toArray(new Member[0]); long now = System.currentTimeMillis(); for (Member member : members) { long access = mapMembers.get(member).longValue(); - if ( (now - access) > timeout ) { + if ((now - access) > timeout) { log.warn(sm.getString("abstractReplicatedMap.ping.timeout", member, mapname)); memberDisappeared(member); } } - }//synch + } // synch } /** * We have received a member alive notification + * * @param member Member */ protected void memberAlive(Member member) { @@ -347,22 +330,23 @@ public abstract class AbstractReplicatedMap<K,V> /** * Helper method to broadcast a message to all members in a channel + * * @param msgtype int - * @param rpc boolean + * @param rpc boolean + * * @throws ChannelException Send error */ protected void broadcast(int msgtype, boolean rpc) throws ChannelException { Member[] members = channel.getMembers(); // No destination. - if (members.length == 0 ) { + if (members.length == 0) { return; } - //send out a map membership message, only wait for the first reply - MapMessage msg = new MapMessage(this.mapContextName, msgtype, - false, null, null, null, channel.getLocalMember(false), null); - if ( rpc) { - Response[] resp = rpcChannel.send(members, msg, - RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); + // send out a map membership message, only wait for the first reply + MapMessage msg = new MapMessage(this.mapContextName, msgtype, false, null, null, null, + channel.getLocalMember(false), null); + if (rpc) { + Response[] resp = rpcChannel.send(members, msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); if (resp.length > 0) { for (Response response : resp) { mapMemberAdded(response.getSource()); @@ -372,7 +356,7 @@ public abstract class AbstractReplicatedMap<K,V> log.warn(sm.getString("abstractReplicatedMap.broadcast.noReplies")); } } else { - channel.send(channel.getMembers(),msg,channelSendOptions); + channel.send(channel.getMembers(), msg, channelSendOptions); } } @@ -382,8 +366,11 @@ public abstract class AbstractReplicatedMap<K,V> this.rpcChannel.breakdown(); } if (this.channel != null) { - try {broadcast(MapMessage.MSG_STOP,false); }catch ( Exception ignore){} - //cleanup + try { + broadcast(MapMessage.MSG_STOP, false); + } catch (Exception ignore) { + } + // cleanup this.channel.removeChannelListener(this); this.channel.removeMembershipListener(this); } @@ -404,23 +391,24 @@ public abstract class AbstractReplicatedMap<K,V> @Override public boolean equals(Object o) { - if ( !(o instanceof AbstractReplicatedMap)) { + if (!(o instanceof AbstractReplicatedMap)) { return false; } - if ( !(o.getClass().equals(this.getClass())) ) { + if (!(o.getClass().equals(this.getClass()))) { return false; } @SuppressWarnings("unchecked") - AbstractReplicatedMap<K,V> other = (AbstractReplicatedMap<K,V>)o; - return Arrays.equals(mapContextName,other.mapContextName); + AbstractReplicatedMap<K,V> other = (AbstractReplicatedMap<K,V>) o; + return Arrays.equals(mapContextName, other.mapContextName); } -//------------------------------------------------------------------------------ -// GROUP COM INTERFACES -//------------------------------------------------------------------------------ - public Member[] getMapMembers(HashMap<Member, Long> members) { + // ------------------------------------------------------------------------------ + // GROUP COM INTERFACES + // ------------------------------------------------------------------------------ + public Member[] getMapMembers(HashMap<Member,Long> members) { return members.keySet().toArray(new Member[0]); } + public Member[] getMapMembers() { synchronized (mapMembers) { return getMapMembers(mapMembers); @@ -433,7 +421,7 @@ public abstract class AbstractReplicatedMap<K,V> } synchronized (mapMembers) { @SuppressWarnings("unchecked") // mapMembers has the correct type - HashMap<Member, Long> list = (HashMap<Member, Long>)mapMembers.clone(); + HashMap<Member,Long> list = (HashMap<Member,Long>) mapMembers.clone(); for (Member member : exclude) { list.remove(member); } @@ -443,52 +431,49 @@ public abstract class AbstractReplicatedMap<K,V> /** - * Replicates any changes to the object since the last time - * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br> - * @param key The object to replicate - * @param complete - if set to true, the object is replicated to its backup - * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will - * be replicated + * Replicates any changes to the object since the last time The object has to be primary, ie, if the object is a + * proxy or a backup, it will not be replicated<br> + * + * @param key The object to replicate + * @param complete - if set to true, the object is replicated to its backup if set to false, only objects that + * implement ReplicatedMapEntry and the isDirty() returns true will be replicated */ public void replicate(Object key, boolean complete) { - if ( log.isTraceEnabled() ) { - log.trace("Replicate invoked on key:"+key); + if (log.isTraceEnabled()) { + log.trace("Replicate invoked on key:" + key); } MapEntry<K,V> entry = innerMap.get(key); - if ( entry == null ) { + if (entry == null) { return; } - if ( !entry.isSerializable() ) { + if (!entry.isSerializable()) { return; } - if (entry.isPrimary() && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0) { - //check to see if we need to replicate this object isDirty()||complete || isAccessReplicate() + if (entry.isPrimary() && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { + // check to see if we need to replicate this object isDirty()||complete || isAccessReplicate() ReplicatedMapEntry rentry = null; if (entry.getValue() instanceof ReplicatedMapEntry) { - rentry = (ReplicatedMapEntry)entry.getValue(); + rentry = (ReplicatedMapEntry) entry.getValue(); } boolean isDirty = rentry != null && rentry.isDirty(); boolean isAccess = rentry != null && rentry.isAccessReplicate(); boolean repl = complete || isDirty || isAccess; if (!repl) { - if ( log.isTraceEnabled() ) { - log.trace("Not replicating:"+key+", no change made"); + if (log.isTraceEnabled()) { + log.trace("Not replicating:" + key + ", no change made"); } return; } - //check to see if the message is diffable + // check to see if the message is diffable MapMessage msg = null; if (rentry != null && rentry.isDiffable() && (isDirty || complete)) { rentry.lock(); try { - //construct a diff message - msg = new MapMessage(mapContextName, getReplicateMessageType(), - true, (Serializable) entry.getKey(), null, - rentry.getDiff(), - entry.getPrimary(), - entry.getBackupNodes()); + // construct a diff message + msg = new MapMessage(mapContextName, getReplicateMessageType(), true, (Serializable) entry.getKey(), + null, rentry.getDiff(), entry.getPrimary(), entry.getBackupNodes()); rentry.resetDiff(); } catch (IOException x) { log.error(sm.getString("abstractReplicatedMap.unable.diffObject"), x); @@ -497,20 +482,17 @@ public abstract class AbstractReplicatedMap<K,V> } } if (msg == null && complete) { - //construct a complete - msg = new MapMessage(mapContextName, getReplicateMessageType(), - false, (Serializable) entry.getKey(), - (Serializable) entry.getValue(), - null, entry.getPrimary(),entry.getBackupNodes()); + // construct a complete + msg = new MapMessage(mapContextName, getReplicateMessageType(), false, (Serializable) entry.getKey(), + (Serializable) entry.getValue(), null, entry.getPrimary(), entry.getBackupNodes()); } if (msg == null) { - //construct a access message - msg = new MapMessage(mapContextName, MapMessage.MSG_ACCESS, - false, (Serializable) entry.getKey(), null, null, entry.getPrimary(), - entry.getBackupNodes()); + // construct a access message + msg = new MapMessage(mapContextName, MapMessage.MSG_ACCESS, false, (Serializable) entry.getKey(), null, + null, entry.getPrimary(), entry.getBackupNodes()); } try { - if ( channel!=null && entry.getBackupNodes()!= null && entry.getBackupNodes().length > 0 ) { + if (channel != null && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { if (rentry != null) { rentry.setLastTimeReplicated(System.currentTimeMillis()); } @@ -519,18 +501,18 @@ public abstract class AbstractReplicatedMap<K,V> } catch (ChannelException x) { log.error(sm.getString("abstractReplicatedMap.unable.replicate"), x); } - } //end if + } // end if } /** - * This can be invoked by a periodic thread to replicate out any changes. - * For maps that don't store objects that implement ReplicatedMapEntry, this - * method should be used infrequently to avoid large amounts of data transfer + * This can be invoked by a periodic thread to replicate out any changes. For maps that don't store objects that + * implement ReplicatedMapEntry, this method should be used infrequently to avoid large amounts of data transfer + * * @param complete boolean */ public void replicate(boolean complete) { - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { replicate(e.getKey(), complete); } } @@ -540,9 +522,10 @@ public abstract class AbstractReplicatedMap<K,V> Member[] members = getMapMembers(); Member backup = members.length > 0 ? members[0] : null; if (backup != null) { - MapMessage msg = new MapMessage(mapContextName, getStateMessageType(), false, - null, null, null, null, null); - Response[] resp = rpcChannel.send(new Member[] {backup}, msg, RpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); + MapMessage msg = + new MapMessage(mapContextName, getStateMessageType(), false, null, null, null, null, null); + Response[] resp = rpcChannel.send(new Member[] { backup }, msg, RpcChannel.FIRST_REPLY, + channelSendOptions, rpcTimeout); if (resp.length > 0) { synchronized (stateMutex) { msg = (MapMessage) resp[0].getMessage(); @@ -550,7 +533,7 @@ public abstract class AbstractReplicatedMap<K,V> ArrayList<?> list = (ArrayList<?>) msg.getValue(); for (Object o : list) { messageReceived((Serializable) o, resp[0].getSource()); - } //for + } // for } stateTransferred = true; } else { @@ -565,52 +548,53 @@ public abstract class AbstractReplicatedMap<K,V> @Override public Serializable replyRequest(Serializable msg, final Member sender) { - if (! (msg instanceof MapMessage)) { + if (!(msg instanceof MapMessage)) { return null; } MapMessage mapmsg = (MapMessage) msg; - //map init request + // map init request if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { mapmsg.setPrimary(channel.getLocalMember(false)); return mapmsg; } - //map start request + // map start request if (mapmsg.getMsgType() == MapMessage.MSG_START) { mapmsg.setPrimary(channel.getLocalMember(false)); mapMemberAdded(sender); return mapmsg; } - //backup request + // backup request if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) { MapEntry<K,V> entry = innerMap.get(mapmsg.getKey()); - if (entry == null || (!entry.isSerializable()) ) { + if (entry == null || (!entry.isSerializable())) { return null; } - mapmsg.setValue( (Serializable) entry.getValue()); + mapmsg.setValue((Serializable) entry.getValue()); return mapmsg; } - //state transfer request + // state transfer request if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY) { - synchronized (stateMutex) { //make sure we don't do two things at the same time + synchronized (stateMutex) { // make sure we don't do two things at the same time ArrayList<MapMessage> list = new ArrayList<>(); - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { MapEntry<K,V> entry = innerMap.get(e.getKey()); - if ( entry != null && entry.isSerializable() ) { + if (entry != null && entry.isSerializable()) { boolean copy = (mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY); - MapMessage me = new MapMessage(mapContextName, - copy?MapMessage.MSG_COPY:MapMessage.MSG_PROXY, - false, (Serializable) entry.getKey(), copy?(Serializable) entry.getValue():null, null, entry.getPrimary(),entry.getBackupNodes()); + MapMessage me = + new MapMessage(mapContextName, copy ? MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false, + (Serializable) entry.getKey(), copy ? (Serializable) entry.getValue() : null, + null, entry.getPrimary(), entry.getBackupNodes()); list.add(me); } } mapmsg.setValue(list); return mapmsg; - } //synchronized + } // synchronized } // ping @@ -626,8 +610,8 @@ public abstract class AbstractReplicatedMap<K,V> @Override public void leftOver(Serializable msg, Member sender) { - //left over membership messages - if (! (msg instanceof MapMessage)) { + // left over membership messages + if (!(msg instanceof MapMessage)) { return; } @@ -650,25 +634,24 @@ public abstract class AbstractReplicatedMap<K,V> } else { // other messages are ignored. if (log.isInfoEnabled()) { - log.info(sm.getString("abstractReplicatedMap.leftOver.ignored", - mapmsg.getTypeDesc())); + log.info(sm.getString("abstractReplicatedMap.leftOver.ignored", mapmsg.getTypeDesc())); } } } catch (IOException | ClassNotFoundException x) { - log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"),x); + log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"), x); } } @SuppressWarnings("unchecked") @Override public void messageReceived(Serializable msg, Member sender) { - if (! (msg instanceof MapMessage)) { + if (!(msg instanceof MapMessage)) { return; } MapMessage mapmsg = (MapMessage) msg; - if ( log.isTraceEnabled() ) { - log.trace("Map["+mapname+"] received message:"+mapmsg); + if (log.isTraceEnabled()) { + log.trace("Map[" + mapname + "] received message:" + mapmsg); } try { @@ -677,8 +660,8 @@ public abstract class AbstractReplicatedMap<K,V> log.error(sm.getString("abstractReplicatedMap.unable.deserialize.MapMessage"), x); return; } - if ( log.isTraceEnabled() ) { - log.trace("Map message received from:"+sender.getName()+" msg:"+mapmsg); + if (log.isTraceEnabled()) { + log.trace("Map message received from:" + sender.getName() + " msg:" + mapmsg); } if (mapmsg.getMsgType() == MapMessage.MSG_START) { mapMemberAdded(mapmsg.getPrimary()); @@ -690,7 +673,7 @@ public abstract class AbstractReplicatedMap<K,V> if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { MapEntry<K,V> entry = innerMap.get(mapmsg.getKey()); - if ( entry==null ) { + if (entry == null) { entry = new MapEntry<>((K) mapmsg.getKey(), (V) mapmsg.getValue()); MapEntry<K,V> old = innerMap.putIfAbsent(entry.getKey(), entry); if (old != null) { @@ -717,8 +700,8 @@ public abstract class AbstractReplicatedMap<K,V> entry.setCopy(mapmsg.getMsgType() == MapMessage.MSG_COPY); entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); - if (mapmsg.getValue() instanceof ReplicatedMapEntry ) { - ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); + if (mapmsg.getValue() instanceof ReplicatedMapEntry) { + ((ReplicatedMapEntry) mapmsg.getValue()).setOwner(getMapOwner()); } } else { entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); @@ -738,33 +721,33 @@ public abstract class AbstractReplicatedMap<K,V> diff.unlock(); } } else { - if ( mapmsg.getValue()!=null ) { + if (mapmsg.getValue() != null) { if (mapmsg.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); + ReplicatedMapEntry re = (ReplicatedMapEntry) mapmsg.getValue(); re.setOwner(getMapOwner()); entry.setValue((V) re); } else { entry.setValue((V) mapmsg.getValue()); } } else { - ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner()); + ((ReplicatedMapEntry) entry.getValue()).setOwner(getMapOwner()); } - } //end if - } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); + } // end if + } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry re = (ReplicatedMapEntry) mapmsg.getValue(); re.setOwner(getMapOwner()); entry.setValue((V) re); } else { - if ( mapmsg.getValue()!=null ) { + if (mapmsg.getValue() != null) { entry.setValue((V) mapmsg.getValue()); } - } //end if - } //end if + } // end if + } // end if innerMap.put(entry.getKey(), entry); - } //end if + } // end if if (mapmsg.getMsgType() == MapMessage.MSG_ACCESS) { - MapEntry<K, V> entry = innerMap.get(mapmsg.getKey()); + MapEntry<K,V> entry = innerMap.get(mapmsg.getKey()); if (entry != null) { entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); @@ -775,7 +758,7 @@ public abstract class AbstractReplicatedMap<K,V> } if (mapmsg.getMsgType() == MapMessage.MSG_NOTIFY_MAPMEMBER) { - MapEntry<K, V> entry = innerMap.get(mapmsg.getKey()); + MapEntry<K,V> entry = innerMap.get(mapmsg.getKey()); if (entry != null) { entry.setBackupNodes(mapmsg.getBackupNodes()); entry.setPrimary(mapmsg.getPrimary()); @@ -790,30 +773,30 @@ public abstract class AbstractReplicatedMap<K,V> public boolean accept(Serializable msg, Member sender) { boolean result = false; if (msg instanceof MapMessage) { - if ( log.isTraceEnabled() ) { - log.trace("Map["+mapname+"] accepting...."+msg); + if (log.isTraceEnabled()) { + log.trace("Map[" + mapname + "] accepting...." + msg); } - result = Arrays.equals(mapContextName, ( (MapMessage) msg).getMapId()); - if ( log.isTraceEnabled() ) { - log.trace("Msg["+mapname+"] accepted["+result+"]...."+msg); + result = Arrays.equals(mapContextName, ((MapMessage) msg).getMapId()); + if (log.isTraceEnabled()) { + log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg); } } return result; } public void mapMemberAdded(Member member) { - if ( member.equals(getChannel().getLocalMember(false)) ) { + if (member.equals(getChannel().getLocalMember(false))) { return; } boolean memberAdded = false; - //select a backup node if we don't have one + // select a backup node if we don't have one Member mapMember = getChannel().getMember(member); if (mapMember == null) { log.warn(sm.getString("abstractReplicatedMap.mapMemberAdded.nullMember", member)); return; } synchronized (mapMembers) { - if (!mapMembers.containsKey(mapMember) ) { + if (!mapMembers.containsKey(mapMember)) { if (log.isInfoEnabled()) { log.info(sm.getString("abstractReplicatedMap.mapMemberAdded.added", mapMember)); } @@ -821,11 +804,11 @@ public abstract class AbstractReplicatedMap<K,V> memberAdded = true; } } - if ( memberAdded ) { + if (memberAdded) { synchronized (stateMutex) { - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { MapEntry<K,V> entry = innerMap.get(e.getKey()); - if ( entry == null ) { + if (entry == null) { continue; } if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { @@ -835,15 +818,15 @@ public abstract class AbstractReplicatedMap<K,V> entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { log.error(sm.getString("abstractReplicatedMap.unableSelect.backup"), x); - } //catch - } //end if - } //while - } //synchronized - }//end if + } // catch + } // end if + } // while + } // synchronized + } // end if } public boolean inSet(Member m, Member[] set) { - if ( set == null ) { + if (set == null) { return false; } boolean result = false; @@ -875,19 +858,19 @@ public abstract class AbstractReplicatedMap<K,V> @Override public void memberAdded(Member member) { - //do nothing + // do nothing } @Override public void memberDisappeared(Member member) { boolean removed = false; synchronized (mapMembers) { - removed = (mapMembers.remove(member) != null ); + removed = (mapMembers.remove(member) != null); if (!removed) { if (log.isDebugEnabled()) { log.debug(sm.getString("replicatedMap.member.disappeared.unknown", member)); } - return; //the member was not part of our map. + return; // the member was not part of our map. } } if (log.isInfoEnabled()) { @@ -898,10 +881,10 @@ public abstract class AbstractReplicatedMap<K,V> while (i.hasNext()) { Map.Entry<K,MapEntry<K,V>> e = i.next(); MapEntry<K,V> entry = innerMap.get(e.getKey()); - if (entry==null) { + if (entry == null) { continue; } - if (entry.isPrimary() && inSet(member,entry.getBackupNodes())) { + if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) { if (log.isDebugEnabled()) { log.debug(sm.getString("abstractReplicatedMap.newBackup")); } @@ -917,23 +900,18 @@ public abstract class AbstractReplicatedMap<K,V> log.debug(sm.getString("abstractReplicatedMap.primaryDisappeared")); } entry.setPrimary(null); - } //end if - - if ( entry.isProxy() && - entry.getPrimary() == null && - entry.getBackupNodes()!=null && - entry.getBackupNodes().length == 1 && - entry.getBackupNodes()[0].equals(member) ) { - //remove proxies that have no backup nor primaries + } // end if + + if (entry.isProxy() && entry.getPrimary() == null && entry.getBackupNodes() != null && + entry.getBackupNodes().length == 1 && entry.getBackupNodes()[0].equals(member)) { + // remove proxies that have no backup nor primaries if (log.isDebugEnabled()) { log.debug(sm.getString("abstractReplicatedMap.removeOrphan")); } i.remove(); - } else if ( entry.getPrimary() == null && - entry.isBackup() && - entry.getBackupNodes()!=null && - entry.getBackupNodes().length == 1 && - entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) { + } else if (entry.getPrimary() == null && entry.isBackup() && entry.getBackupNodes() != null && + entry.getBackupNodes().length == 1 && + entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) { try { if (log.isDebugEnabled()) { log.debug(sm.getString("abstractReplicatedMap.newPrimary")); @@ -944,8 +922,8 @@ public abstract class AbstractReplicatedMap<K,V> entry.setCopy(false); Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); - if ( mapOwner!=null ) { - mapOwner.objectMadePrimary(entry.getKey(),entry.getValue()); + if (mapOwner != null) { + mapOwner.objectMadePrimary(entry.getKey(), entry.getValue()); } } catch (ChannelException x) { @@ -953,11 +931,10 @@ public abstract class AbstractReplicatedMap<K,V> } } - } //while + } // while long complete = System.currentTimeMillis() - start; if (log.isInfoEnabled()) { - log.info(sm.getString("abstractReplicatedMap.relocate.complete", - Long.toString(complete))); + log.info(sm.getString("abstractReplicatedMap.relocate.complete", Long.toString(complete))); } } @@ -975,13 +952,14 @@ public abstract class AbstractReplicatedMap<K,V> return node; } } + public Member getNextBackupNode() { Member[] members = getMapMembers(); int node = getNextBackupIndex(); - if ( members.length == 0 || node==-1) { + if (members.length == 0 || node == -1) { return null; } - if ( node >= members.length ) { + if (node >= members.length) { node = 0; } return members[node]; @@ -989,9 +967,12 @@ public abstract class AbstractReplicatedMap<K,V> /** * Publish info about a map pair (key/value) to other nodes in the cluster. - * @param key Object + * + * @param key Object * @param value Object + * * @return Member - the backup node + * * @throws ChannelException Cluster error */ protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; @@ -1002,31 +983,33 @@ public abstract class AbstractReplicatedMap<K,V> if (this.state.isAvailable()) { ping(accessTimeout); } - }catch ( Exception x ) { - log.error(sm.getString("abstractReplicatedMap.heartbeat.failed"),x); + } catch (Exception x) { + log.error(sm.getString("abstractReplicatedMap.heartbeat.failed"), x); } } -//------------------------------------------------------------------------------ -// METHODS TO OVERRIDE -//------------------------------------------------------------------------------ + // ------------------------------------------------------------------------------ + // METHODS TO OVERRIDE + // ------------------------------------------------------------------------------ @Override public V remove(Object key) { - return remove(key,true); + return remove(key, true); } + public V remove(Object key, boolean notify) { MapEntry<K,V> entry = innerMap.remove(key); try { if (getMapMembers().length > 0 && notify) { - MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null,null); + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, + null, null, null, null); getChannel().send(getMapMembers(), msg, getChannelSendOptions()); } - } catch ( ChannelException x ) { - log.error(sm.getString("abstractReplicatedMap.unable.remove"),x); + } catch (ChannelException x) { + log.error(sm.getString("abstractReplicatedMap.unable.remove"), x); } - return entry!=null?entry.getValue():null; + return entry != null ? entry.getValue() : null; } public MapEntry<K,V> getInternal(Object key) { @@ -1038,58 +1021,60 @@ public abstract class AbstractReplicatedMap<K,V> public V get(Object key) { MapEntry<K,V> entry = innerMap.get(key); if (log.isTraceEnabled()) { - log.trace("Requesting id:"+key+" entry:"+entry); + log.trace("Requesting id:" + key + " entry:" + entry); } - if ( entry == null ) { + if (entry == null) { return null; } - if ( !entry.isPrimary() ) { - //if the message is not primary, we need to retrieve the latest value + if (!entry.isPrimary()) { + // if the message is not primary, we need to retrieve the latest value try { Member[] backup = null; MapMessage msg = null; if (entry.isBackup()) { - //select a new backup node + // select a new backup node backup = publishEntryInfo(key, entry.getValue()); - } else if ( entry.isProxy() ) { - //make sure we don't retrieve from ourselves - msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, - (Serializable) key, null, null, null,null); - Response[] resp = getRpcChannel().send(entry.getBackupNodes(),msg, RpcChannel.FIRST_REPLY, getChannelSendOptions(), getRpcTimeout()); + } else if (entry.isProxy()) { + // make sure we don't retrieve from ourselves + msg = new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable) key, + null, null, null, null); + Response[] resp = getRpcChannel().send(entry.getBackupNodes(), msg, RpcChannel.FIRST_REPLY, + getChannelSendOptions(), getRpcTimeout()); if (resp == null || resp.length == 0 || resp[0].getMessage() == null) { - //no responses + // no responses log.warn(sm.getString("abstractReplicatedMap.unable.retrieve", key)); return null; } msg = (MapMessage) resp[0].getMessage(); msg.deserialize(getExternalLoaders()); backup = entry.getBackupNodes(); - if ( msg.getValue()!=null ) { + if (msg.getValue() != null) { entry.setValue((V) msg.getValue()); } // notify member - msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, - (Serializable)entry.getKey(), null, null, channel.getLocalMember(false), backup); - if ( backup != null && backup.length > 0) { + msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false, + (Serializable) entry.getKey(), null, null, channel.getLocalMember(false), backup); + if (backup != null && backup.length > 0) { getChannel().send(backup, msg, getChannelSendOptions()); } - //invalidate the previous primary - msg = new MapMessage(getMapContextName(),MapMessage.MSG_PROXY,false,(Serializable)key,null,null,channel.getLocalMember(false),backup); + // invalidate the previous primary + msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable) key, null, + null, channel.getLocalMember(false), backup); Member[] dest = getMapMembersExcl(backup); - if ( dest!=null && dest.length >0) { + if (dest != null && dest.length > 0) { getChannel().send(dest, msg, getChannelSendOptions()); } if (entry.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); + ReplicatedMapEntry val = (ReplicatedMapEntry) entry.getValue(); val.setOwner(getMapOwner()); } - } else if ( entry.isCopy() ) { + } else if (entry.isCopy()) { backup = getMapMembers(); if (backup.length > 0) { - msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, - (Serializable)key,null,null,channel.getLocalMember(false),backup); + msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false, + (Serializable) key, null, null, channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } } @@ -1098,7 +1083,7 @@ public abstract class AbstractReplicatedMap<K,V> entry.setBackup(false); entry.setProxy(false); entry.setCopy(false); - if ( getMapOwner()!=null ) { + if (getMapOwner() != null) { getMapOwner().objectMadePrimary(key, entry.getValue()); } @@ -1108,7 +1093,7 @@ public abstract class AbstractReplicatedMap<K,V> } } if (log.isTraceEnabled()) { - log.trace("Requesting id:"+key+" result:"+entry.getValue()); + log.trace("Requesting id:" + key + " result:" + entry.getValue()); } return entry.getValue(); } @@ -1116,23 +1101,22 @@ public abstract class AbstractReplicatedMap<K,V> protected void printMap(String header) { try { - System.out.println("\nDEBUG MAP:"+header); - System.out.println("Map[" + - new String(mapContextName, StandardCharsets.ISO_8859_1) + - ", Map Size:" + innerMap.size()); + System.out.println("\nDEBUG MAP:" + header); + System.out.println( + "Map[" + new String(mapContextName, StandardCharsets.ISO_8859_1) + ", Map Size:" + innerMap.size()); Member[] mbrs = getMapMembers(); - for ( int i=0; i<mbrs.length;i++ ) { - System.out.println("Mbr["+(i+1)+"="+mbrs[i].getName()); + for (int i = 0; i < mbrs.length; i++) { + System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName()); } Iterator<Map.Entry<K,MapEntry<K,V>>> i = innerMap.entrySet().iterator(); int cnt = 0; while (i.hasNext()) { Map.Entry<?,?> e = i.next(); - System.out.println( (++cnt) + ". " + innerMap.get(e.getKey())); + System.out.println((++cnt) + ". " + innerMap.get(e.getKey())); } System.out.println("EndMap]\n\n"); - }catch ( Exception ignore) { + } catch (Exception ignore) { if (log.isTraceEnabled()) { log.trace("Error printing map", ignore); } @@ -1140,10 +1124,11 @@ public abstract class AbstractReplicatedMap<K,V> } /** - * Returns true if the key has an entry in the map. - * The entry can be a proxy or a backup entry, invoking <code>get(key)</code> - * will make this entry primary for the group + * Returns true if the key has an entry in the map. The entry can be a proxy or a backup entry, invoking + * <code>get(key)</code> will make this entry primary for the group + * * @param key Object + * * @return boolean */ @Override @@ -1165,28 +1150,28 @@ public abstract class AbstractReplicatedMap<K,V> V old = null; - //make sure that any old values get removed - if ( containsKey(key) ) { + // make sure that any old values get removed + if (containsKey(key)) { old = remove(key); } try { - if ( notify ) { + if (notify) { Member[] backup = publishEntryInfo(key, value); entry.setBackupNodes(backup); } } catch (ChannelException x) { log.error(sm.getString("abstractReplicatedMap.unable.put"), x); } - innerMap.put(key,entry); + innerMap.put(key, entry); return old; } @Override - public void putAll(Map<? extends K, ? extends V> m) { - for (Entry<? extends K, ? extends V> value : m.entrySet()) { + public void putAll(Map<? extends K,? extends V> m) { + for (Entry<? extends K,? extends V> value : m.entrySet()) { @SuppressWarnings("unchecked") - Entry<K, V> entry = (Entry<K, V>) value; + Entry<K,V> entry = (Entry<K,V>) value; put(entry.getKey(), entry.getValue()); } } @@ -1197,8 +1182,8 @@ public abstract class AbstractReplicatedMap<K,V> } public void clear(boolean notify) { - if ( notify ) { - //only delete active keys + if (notify) { + // only delete active keys for (K k : keySet()) { remove(k); } @@ -1210,9 +1195,9 @@ public abstract class AbstractReplicatedMap<K,V> @Override public boolean containsValue(Object value) { Objects.requireNonNull(value); - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { MapEntry<K,V> entry = innerMap.get(e.getKey()); - if (entry!=null && entry.isActive() && value.equals(entry.getValue())) { + if (entry != null && entry.isActive() && value.equals(entry.getValue())) { return true; } } @@ -1220,9 +1205,9 @@ public abstract class AbstractReplicatedMap<K,V> } /** - * Returns the entire contents of the map - * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information - * about the object. + * Returns the entire contents of the map Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object + * containing all the information about the object. + * * @return Set */ public Set<Map.Entry<K,MapEntry<K,V>>> entrySetFull() { @@ -1240,10 +1225,10 @@ public abstract class AbstractReplicatedMap<K,V> @Override public Set<Map.Entry<K,V>> entrySet() { LinkedHashSet<Map.Entry<K,V>> set = new LinkedHashSet<>(innerMap.size()); - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { Object key = e.getKey(); MapEntry<K,V> entry = innerMap.get(key); - if ( entry != null && entry.isActive() ) { + if (entry != null && entry.isActive()) { set.add(entry); } } @@ -1252,13 +1237,13 @@ public abstract class AbstractReplicatedMap<K,V> @Override public Set<K> keySet() { - //todo implement - //should only return keys where this is active. + // todo implement + // should only return keys where this is active. LinkedHashSet<K> set = new LinkedHashSet<>(innerMap.size()); - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { K key = e.getKey(); MapEntry<K,V> entry = innerMap.get(key); - if ( entry!=null && entry.isActive() ) { + if (entry != null && entry.isActive()) { set.add(key); } } @@ -1269,15 +1254,15 @@ public abstract class AbstractReplicatedMap<K,V> @Override public int size() { - //todo, implement a counter variable instead - //only count active members in this node + // todo, implement a counter variable instead + // only count active members in this node int counter = 0; Iterator<Map.Entry<K,MapEntry<K,V>>> it = innerMap.entrySet().iterator(); - while (it!=null && it.hasNext() ) { + while (it != null && it.hasNext()) { Map.Entry<?,?> e = it.next(); - if ( e != null ) { + if (e != null) { MapEntry<K,V> entry = innerMap.get(e.getKey()); - if (entry!=null && entry.isActive() && entry.getValue() != null) { + if (entry != null && entry.isActive() && entry.getValue() != null) { counter++; } } @@ -1287,15 +1272,15 @@ public abstract class AbstractReplicatedMap<K,V> @Override public boolean isEmpty() { - return size()==0; + return size() == 0; } @Override public Collection<V> values() { List<V> values = new ArrayList<>(); - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { MapEntry<K,V> entry = innerMap.get(e.getKey()); - if (entry!=null && entry.isActive() && entry.getValue()!=null) { + if (entry != null && entry.isActive() && entry.getValue() != null) { values.add(entry.getValue()); } } @@ -1303,10 +1288,10 @@ public abstract class AbstractReplicatedMap<K,V> } -//------------------------------------------------------------------------------ -// Map Entry class -//------------------------------------------------------------------------------ - public static class MapEntry<K,V> implements Map.Entry<K,V> { + // ------------------------------------------------------------------------------ + // Map Entry class + // ------------------------------------------------------------------------------ + public static class MapEntry<K, V> implements Map.Entry<K,V> { private boolean backup; private boolean proxy; private boolean copy; @@ -1366,8 +1351,7 @@ public abstract class AbstractReplicatedMap<K,V> } public boolean isDiffable() { - return (value instanceof ReplicatedMapEntry) && - ((ReplicatedMapEntry)value).isDiffable(); + return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry) value).isDiffable(); } public void setBackupNodes(Member[] nodes) { @@ -1421,15 +1405,18 @@ public abstract class AbstractReplicatedMap<K,V> /** * apply a diff, or an entire object - * @param data byte[] + * + * @param data byte[] * @param offset int * @param length int - * @param diff boolean - * @throws IOException IO error + * @param diff boolean + * + * @throws IOException IO error * @throws ClassNotFoundException Deserialization error */ @SuppressWarnings("unchecked") - public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException { + public void apply(byte[] data, int offset, int length, boolean diff) + throws IOException, ClassNotFoundException { if (isDiffable() && diff) { ReplicatedMapEntry rentry = (ReplicatedMapEntry) value; rentry.lock(); @@ -1459,9 +1446,9 @@ public abstract class AbstractReplicatedMap<K,V> } -//------------------------------------------------------------------------------ -// map message to send to and from other maps -//------------------------------------------------------------------------------ + // ------------------------------------------------------------------------------ + // map message to send to and from other maps + // ------------------------------------------------------------------------------ public static class MapMessage implements Serializable, Cloneable { private static final long serialVersionUID = 1L; @@ -1506,26 +1493,39 @@ public abstract class AbstractReplicatedMap<K,V> public String getTypeDesc() { switch (msgtype) { - case MSG_BACKUP: return "MSG_BACKUP"; - case MSG_RETRIEVE_BACKUP: return "MSG_RETRIEVE_BACKUP"; - case MSG_PROXY: return "MSG_PROXY"; - case MSG_REMOVE: return "MSG_REMOVE"; - case MSG_STATE: return "MSG_STATE"; - case MSG_START: return "MSG_START"; - case MSG_STOP: return "MSG_STOP"; - case MSG_INIT: return "MSG_INIT"; - case MSG_STATE_COPY: return "MSG_STATE_COPY"; - case MSG_COPY: return "MSG_COPY"; - case MSG_ACCESS: return "MSG_ACCESS"; - case MSG_NOTIFY_MAPMEMBER: return "MSG_NOTIFY_MAPMEMBER"; - case MSG_PING: return "MSG_PING"; - default : return "UNKNOWN"; + case MSG_BACKUP: + return "MSG_BACKUP"; + case MSG_RETRIEVE_BACKUP: + return "MSG_RETRIEVE_BACKUP"; + case MSG_PROXY: + return "MSG_PROXY"; + case MSG_REMOVE: + return "MSG_REMOVE"; + case MSG_STATE: + return "MSG_STATE"; + case MSG_START: + return "MSG_START"; + case MSG_STOP: + return "MSG_STOP"; + case MSG_INIT: + return "MSG_INIT"; + case MSG_STATE_COPY: + return "MSG_STATE_COPY"; + case MSG_COPY: + return "MSG_COPY"; + case MSG_ACCESS: + return "MSG_ACCESS"; + case MSG_NOTIFY_MAPMEMBER: + return "MSG_NOTIFY_MAPMEMBER"; + case MSG_PING: + return "MSG_PING"; + default: + return "UNKNOWN"; } } - public MapMessage(byte[] mapId,int msgtype, boolean diff, - Serializable key, Serializable value, - byte[] diffvalue, Member primary, Member[] nodes) { + public MapMessage(byte[] mapId, int msgtype, boolean diff, Serializable key, Serializable value, + byte[] diffvalue, Member primary, Member[] nodes) { this.mapId = mapId; this.msgtype = msgtype; this.diff = diff; @@ -1554,19 +1554,19 @@ public abstract class AbstractReplicatedMap<K,V> public Serializable getKey() { try { return key(null); - } catch ( Exception x ) { + } catch (Exception x) { throw new RuntimeException(sm.getString("mapMessage.deserialize.error.key"), x); } } public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException { - if ( key!=null ) { + if (key != null) { return key; } - if ( keydata == null || keydata.length == 0 ) { + if (keydata == null || keydata.length == 0) { return null; } - key = XByteBuffer.deserialize(keydata,0,keydata.length,cls); + key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls); keydata = null; return key; } @@ -1578,19 +1578,19 @@ public abstract class AbstractReplicatedMap<K,V> public Serializable getValue() { try { return value(null); - } catch ( Exception x ) { + } catch (Exception x) { throw new RuntimeException(sm.getString("mapMessage.deserialize.error.value"), x); } } - public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException { - if ( value!=null ) { + public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException { + if (value != null) { return value; } - if ( valuedata == null || valuedata.length == 0 ) { + if (valuedata == null || valuedata.length == 0) { return null; } - value = XByteBuffer.deserialize(valuedata,0,valuedata.length,cls); + value = XByteBuffer.deserialize(valuedata, 0, valuedata.length, cls); valuedata = null; return value; } @@ -1621,11 +1621,11 @@ public abstract class AbstractReplicatedMap<K,V> public void setValue(Serializable value) { try { - if ( value != null ) { + if (value != null) { valuedata = XByteBuffer.serialize(value); } this.value = value; - }catch ( IOException x ) { + } catch (IOException x) { throw new RuntimeException(x); } } @@ -1650,7 +1650,7 @@ public abstract class AbstractReplicatedMap<K,V> throw new AssertionError(); } } - } //MapMessage + } // MapMessage public Channel getChannel() { diff --git a/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java b/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java index 712ffa0a4c..751b2d15de 100644 --- a/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java +++ b/java/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java @@ -27,87 +27,93 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** - * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. - * One node is always the primary and one node is always the backup. - * This map is synchronized across a cluster, and only has one backup member.<br> + * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. One node is always the + * primary and one node is always the backup. This map is synchronized across a cluster, and only has one backup + * member.<br> * A perfect usage for this map would be a session map for a session manager in a clustered environment.<br> - * The only way to modify this list is to use the <code>put, putAll, remove</code> methods. - * entrySet, entrySetFull, keySet, keySetFull, returns all non modifiable sets.<br><br> - * If objects (values) in the map change without invoking <code>put()</code> or <code>remove()</code> - * the data can be distributed using two different methods:<br> + * The only way to modify this list is to use the <code>put, putAll, remove</code> methods. entrySet, entrySetFull, + * keySet, keySetFull, returns all non modifiable sets.<br> + * <br> + * If objects (values) in the map change without invoking <code>put()</code> or <code>remove()</code> the data can be + * distributed using two different methods:<br> * <code>replicate(boolean)</code> and <code>replicate(Object, boolean)</code><br> * These two methods are very important two understand. The map can work with two set of value objects:<br> * 1. Serializable - the entire object gets serialized each time it is replicated<br> * 2. ReplicatedMapEntry - this interface allows for a isDirty() flag and to replicate diffs if desired.<br> - * Implementing the <code>ReplicatedMapEntry</code> interface allows you to decide what objects - * get replicated and how much data gets replicated each time.<br> - * If you implement a smart AOP mechanism to detect changes in underlying objects, you can replicate - * only those changes by implementing the ReplicatedMapEntry interface, and return true when isDiffable() - * is invoked.<br><br> - * - * This map implementation doesn't have a background thread running to replicate changes. - * If you do have changes without invoking put/remove then you need to invoke one of the following methods: + * Implementing the <code>ReplicatedMapEntry</code> interface allows you to decide what objects get replicated and how + * much data gets replicated each time.<br> + * If you implement a smart AOP mechanism to detect changes in underlying objects, you can replicate only those changes + * by implementing the ReplicatedMapEntry interface, and return true when isDiffable() is invoked.<br> + * <br> + * This map implementation doesn't have a background thread running to replicate changes. If you do have changes without + * invoking put/remove then you need to invoke one of the following methods: * <ul> * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li> * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li> - * </ul> - * the <code>boolean</code> value in the <code>replicate</code> method used to decide - * whether to only replicate objects that implement the <code>ReplicatedMapEntry</code> interface - * or to replicate all objects. If an object doesn't implement the <code>ReplicatedMapEntry</code> interface - * each time the object gets replicated the entire object gets serialized, hence a call to <code>replicate(true)</code> - * will replicate all objects in this map that are using this node as primary. - * - * <br><br><b>REMEMBER TO CALL</b> <code>breakdown()</code> when you are done with the map to - * avoid memory leaks.<br><br> + * </ul> + * the <code>boolean</code> value in the <code>replicate</code> method used to decide whether to only replicate objects + * that implement the <code>ReplicatedMapEntry</code> interface or to replicate all objects. If an object doesn't + * implement the <code>ReplicatedMapEntry</code> interface each time the object gets replicated the entire object gets + * serialized, hence a call to <code>replicate(true)</code> will replicate all objects in this map that are using this + * node as primary. <br> + * <br> + * <b>REMEMBER TO CALL</b> <code>breakdown()</code> when you are done with the map to avoid memory leaks.<br> + * <br> * TODO implement periodic sync/transfer thread * * @param <K> The type of Key * @param <V> The type of Value */ -public class LazyReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { +public class LazyReplicatedMap<K, V> extends AbstractReplicatedMap<K,V> { private static final long serialVersionUID = 1L; // Lazy init to support serialization private transient volatile Log log; -//------------------------------------------------------------------------------ -// CONSTRUCTORS / DESTRUCTORS -//------------------------------------------------------------------------------ + // ------------------------------------------------------------------------------ + // CONSTRUCTORS / DESTRUCTORS + // ------------------------------------------------------------------------------ /** * Creates a new map - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages - * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages + * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap - * @param loadFactor float - load factor, see HashMap - * @param cls Class loaders + * @param loadFactor float - load factor, see HashMap + * @param cls Class loaders */ - public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, float loadFactor, ClassLoader[] cls) { - super(owner,channel,timeout,mapContextName,initialCapacity,loadFactor, Channel.SEND_OPTIONS_DEFAULT,cls, true); + public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, + float loadFactor, ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls, + true); } /** * Creates a new map - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages - * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages + * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap - * @param cls Class loaders + * @param cls Class loaders */ - public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, ClassLoader[] cls) { + public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, + ClassLoader[] cls) { super(owner, channel, timeout, mapContextName, initialCapacity, DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls, true); } /** * Creates a new map - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param cls Class loaders + * @param cls Class loaders */ public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { super(owner, channel, timeout, mapContextName, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, @@ -116,22 +122,24 @@ public class LazyReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { /** * Creates a new map - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param cls Class loaders - * @param terminate boolean - Flag for whether to terminate this map that failed to start. + * @param cls Class loaders + * @param terminate boolean - Flag for whether to terminate this map that failed to start. */ - public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls, boolean terminate) { + public LazyReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls, + boolean terminate) { super(owner, channel, timeout, mapContextName, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls, terminate); } -//------------------------------------------------------------------------------ -// METHODS TO OVERRIDE -//------------------------------------------------------------------------------ + // ------------------------------------------------------------------------------ + // METHODS TO OVERRIDE + // ------------------------------------------------------------------------------ @Override protected int getStateMessageType() { return AbstractReplicatedMap.MapMessage.MSG_STATE; @@ -145,7 +153,7 @@ public class LazyReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { @Override protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { Log log = getLog(); - if (! (key instanceof Serializable && value instanceof Serializable) ) { + if (!(key instanceof Serializable && value instanceof Serializable)) { return new Member[0]; } Member[] members = getMapMembers(); @@ -153,19 +161,19 @@ public class LazyReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { int nextIdx = firstIdx; Member[] backup = new Member[0]; - //there are no backups - if ( members.length == 0 || firstIdx == -1 ) { + // there are no backups + if (members.length == 0 || firstIdx == -1) { return backup; } boolean success = false; do { - //select a backup node + // select a backup node Member next = members[nextIdx]; - //increment for the next round of back up selection + // increment for the next round of back up selection nextIdx = nextIdx + 1; - if ( nextIdx >= members.length ) { + if (nextIdx >= members.length) { nextIdx = 0; } @@ -175,41 +183,41 @@ public class LazyReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { MapMessage msg = null; try { Member[] tmpBackup = wrap(next); - //publish the backup data to one node - msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, - (Serializable) key, (Serializable) value, null, channel.getLocalMember(false), tmpBackup); - if ( log.isTraceEnabled() ) { - log.trace("Publishing backup data:"+msg+" to: "+next.getName()); + // publish the backup data to one node + msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false, (Serializable) key, + (Serializable) value, null, channel.getLocalMember(false), tmpBackup); + if (log.isTraceEnabled()) { + log.trace("Publishing backup data:" + msg + " to: " + next.getName()); } UniqueId id = getChannel().send(tmpBackup, msg, getChannelSendOptions()); - if ( log.isTraceEnabled() ) { - log.trace("Data published:"+msg+" msg Id:"+id); + if (log.isTraceEnabled()) { + log.trace("Data published:" + msg + " msg Id:" + id); } - //we published out to a backup, mark the test success + // we published out to a backup, mark the test success success = true; backup = tmpBackup; - }catch ( ChannelException x ) { + } catch (ChannelException x) { log.error(sm.getString("lazyReplicatedMap.unableReplicate.backup", key, next, x.getMessage()), x); continue; } try { - //publish the data out to all nodes + // publish the data out to all nodes Member[] proxies = excludeFromSet(backup, getMapMembers()); - if (success && proxies.length > 0 ) { - msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, - (Serializable) key, null, null, channel.getLocalMember(false),backup); - if ( log.isTraceEnabled() ) { - log.trace("Publishing proxy data:"+msg+" to: "+Arrays.toNameString(proxies)); + if (success && proxies.length > 0) { + msg = new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable) key, null, + null, channel.getLocalMember(false), backup); + if (log.isTraceEnabled()) { + log.trace("Publishing proxy data:" + msg + " to: " + Arrays.toNameString(proxies)); } getChannel().send(proxies, msg, getChannelSendOptions()); } - }catch ( ChannelException x ) { - //log the error, but proceed, this should only happen if a node went down, - //and if the node went down, then it can't receive the message, the others - //should still get it. + } catch (ChannelException x) { + // log the error, but proceed, this should only happen if a node went down, + // and if the node went down, then it can't receive the message, the others + // should still get it. log.error(sm.getString("lazyReplicatedMap.unableReplicate.proxy", key, next, x.getMessage()), x); } - } while ( !success && (firstIdx!=nextIdx)); + } while (!success && (firstIdx != nextIdx)); return backup; } diff --git a/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java b/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java index 92c9d6f2e2..a935098579 100644 --- a/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java +++ b/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java @@ -30,74 +30,80 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** - * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical - * copy of the map.<br><br> - * This map implementation doesn't have a background thread running to replicate changes. - * If you do have changes without invoking put/remove then you need to invoke one of the following methods: + * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical copy of the + * map.<br> + * <br> + * This map implementation doesn't have a background thread running to replicate changes. If you do have changes without + * invoking put/remove then you need to invoke one of the following methods: * <ul> * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li> * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li> - * </ul> - * the <code>boolean</code> value in the <code>replicate</code> method used to decide - * whether to only replicate objects that implement the <code>ReplicatedMapEntry</code> interface - * or to replicate all objects. If an object doesn't implement the <code>ReplicatedMapEntry</code> interface - * each time the object gets replicated the entire object gets serialized, hence a call to <code>replicate(true)</code> - * will replicate all objects in this map that are using this node as primary. - * - * <br><br><b>REMEMBER TO CALL <code>breakdown()</code> - * when you are done with the map to avoid memory leaks.</b><br><br> + * </ul> + * the <code>boolean</code> value in the <code>replicate</code> method used to decide whether to only replicate objects + * that implement the <code>ReplicatedMapEntry</code> interface or to replicate all objects. If an object doesn't + * implement the <code>ReplicatedMapEntry</code> interface each time the object gets replicated the entire object gets + * serialized, hence a call to <code>replicate(true)</code> will replicate all objects in this map that are using this + * node as primary. <br> + * <br> + * <b>REMEMBER TO CALL <code>breakdown()</code> when you are done with the map to avoid memory leaks.</b><br> + * <br> * TODO implement periodic sync/transfer thread<br> - * TODO memberDisappeared, should do nothing except change map membership - * by default it relocates the primary objects + * TODO memberDisappeared, should do nothing except change map membership by default it relocates the primary objects * * @param <K> The type of Key * @param <V> The type of Value */ -public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { +public class ReplicatedMap<K, V> extends AbstractReplicatedMap<K,V> { private static final long serialVersionUID = 1L; // Lazy init to support serialization private transient volatile Log log; - //-------------------------------------------------------------------------- - // CONSTRUCTORS / DESTRUCTORS - //-------------------------------------------------------------------------- + // -------------------------------------------------------------------------- + // CONSTRUCTORS / DESTRUCTORS + // -------------------------------------------------------------------------- /** * Creates a new map - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages - * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages + * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap - * @param loadFactor float - load factor, see HashMap - * @param cls Class loaders + * @param loadFactor float - load factor, see HashMap + * @param cls Class loaders */ - public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity,float loadFactor, ClassLoader[] cls) { - super(owner,channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls, true); + public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, + float loadFactor, ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls, + true); } /** * Creates a new map - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages - * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages + * @param mapContextName String - unique name for this map, to allow multiple maps per channel * @param initialCapacity int - the size of this map, see HashMap - * @param cls Class loaders + * @param cls Class loaders */ - public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, ClassLoader[] cls) { - super(owner,channel, timeout, mapContextName, initialCapacity, DEFAULT_LOAD_FACTOR, + public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, int initialCapacity, + ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, initialCapacity, DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls, true); } /** * Creates a new map - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param cls Class loaders + * @param cls Class loaders */ public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { super(owner, channel, timeout, mapContextName, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, @@ -106,21 +112,23 @@ public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { /** * Creates a new map - * @param owner The map owner - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messages + * + * @param owner The map owner + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messages * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param cls Class loaders - * @param terminate boolean - Flag for whether to terminate this map that failed to start. + * @param cls Class loaders + * @param terminate boolean - Flag for whether to terminate this map that failed to start. */ - public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls, boolean terminate) { + public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls, + boolean terminate) { super(owner, channel, timeout, mapContextName, DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls, terminate); } -//------------------------------------------------------------------------------ -// METHODS TO OVERRIDE -//------------------------------------------------------------------------------ + // ------------------------------------------------------------------------------ + // METHODS TO OVERRIDE + // ------------------------------------------------------------------------------ @Override protected int getStateMessageType() { return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY; @@ -133,10 +141,10 @@ public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { @Override protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { - if (! (key instanceof Serializable && value instanceof Serializable) ) { + if (!(key instanceof Serializable && value instanceof Serializable)) { return new Member[0]; } - //select a backup node + // select a backup node Member[] backup = getMapMembers(); if (backup == null || backup.length == 0) { @@ -144,9 +152,9 @@ public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { } try { - //publish the data out to all nodes - MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, - (Serializable) key, (Serializable) value, null,channel.getLocalMember(false), backup); + // publish the data out to all nodes + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable) key, + (Serializable) value, null, channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } catch (ChannelException e) { @@ -181,29 +189,29 @@ public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { boolean removed = false; Log log = getLog(); synchronized (mapMembers) { - removed = (mapMembers.remove(member) != null ); + removed = (mapMembers.remove(member) != null); if (!removed) { if (log.isDebugEnabled()) { log.debug(sm.getString("replicatedMap.member.disappeared.unknown", member)); } - return; //the member was not part of our map. + return; // the member was not part of our map. } } if (log.isInfoEnabled()) { log.info(sm.getString("replicatedMap.member.disappeared", member)); } long start = System.currentTimeMillis(); - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { MapEntry<K,V> entry = innerMap.get(e.getKey()); - if (entry==null) { + if (entry == null) { continue; } if (entry.isPrimary()) { try { Member[] backup = getMapMembers(); if (backup.length > 0) { - MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, - (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup); + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false, + (Serializable) entry.getKey(), null, null, channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } entry.setBackupNodes(backup); @@ -215,11 +223,9 @@ public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { entry.setPrimary(null); } - if ( entry.getPrimary() == null && - entry.isCopy() && - entry.getBackupNodes()!=null && - entry.getBackupNodes().length > 0 && - entry.getBackupNodes()[0].equals(channel.getLocalMember(false)) ) { + if (entry.getPrimary() == null && entry.isCopy() && entry.getBackupNodes() != null && + entry.getBackupNodes().length > 0 && + entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) { try { entry.setPrimary(channel.getLocalMember(false)); entry.setBackup(false); @@ -227,13 +233,13 @@ public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { entry.setCopy(false); Member[] backup = getMapMembers(); if (backup.length > 0) { - MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER,false, - (Serializable)entry.getKey(),null,null,channel.getLocalMember(false),backup); + MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_NOTIFY_MAPMEMBER, false, + (Serializable) entry.getKey(), null, null, channel.getLocalMember(false), backup); getChannel().send(backup, msg, getChannelSendOptions()); } entry.setBackupNodes(backup); - if ( mapOwner!=null ) { - mapOwner.objectMadePrimary(entry.getKey(),entry.getValue()); + if (mapOwner != null) { + mapOwner.objectMadePrimary(entry.getKey(), entry.getValue()); } } catch (ChannelException x) { @@ -241,35 +247,34 @@ public class ReplicatedMap<K,V> extends AbstractReplicatedMap<K,V> { } } - } //while + } // while long complete = System.currentTimeMillis() - start; if (log.isInfoEnabled()) { - log.info(sm.getString("replicatedMap.relocate.complete", - Long.toString(complete))); + log.info(sm.getString("replicatedMap.relocate.complete", Long.toString(complete))); } } @Override public void mapMemberAdded(Member member) { - if ( member.equals(getChannel().getLocalMember(false)) ) { + if (member.equals(getChannel().getLocalMember(false))) { return; } boolean memberAdded = false; synchronized (mapMembers) { - if (!mapMembers.containsKey(member) ) { + if (!mapMembers.containsKey(member)) { mapMembers.put(member, Long.valueOf(System.currentTimeMillis())); memberAdded = true; } } - if ( memberAdded ) { + if (memberAdded) { synchronized (stateMutex) { Member[] backup = getMapMembers(); - for (Entry<K, MapEntry<K, V>> e : innerMap.entrySet()) { + for (Entry<K,MapEntry<K,V>> e : innerMap.entrySet()) { MapEntry<K,V> entry = innerMap.get(e.getKey()); - if ( entry == null ) { + if (entry == null) { continue; } - if (entry.isPrimary() && !inSet(member,entry.getBackupNodes())) { + if (entry.isPrimary() && !inSet(member, entry.getBackupNodes())) { entry.setBackupNodes(backup); } } diff --git a/java/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java b/java/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java index 7b97aef157..9cf98380e8 100644 --- a/java/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java +++ b/java/org/apache/catalina/tribes/tipis/ReplicatedMapEntry.java @@ -32,8 +32,7 @@ import java.io.Serializable; * 5. entry.unlock();<br> * }<br> * }<br> - * </code> - * <br> + * </code> <br> * <br> * When the data is deserialized the logic is called in the following order<br> * <code> @@ -44,22 +43,25 @@ import java.io.Serializable; public interface ReplicatedMapEntry extends Serializable { /** - * Has the object changed since last replication - * and is not in a locked state + * Has the object changed since last replication and is not in a locked state + * * @return boolean */ boolean isDirty(); /** - * If this returns true, the map will extract the diff using getDiff() - * Otherwise it will serialize the entire object. + * If this returns true, the map will extract the diff using getDiff() Otherwise it will serialize the entire + * object. + * * @return boolean */ boolean isDiffable(); /** * Returns a diff and sets the dirty map to false + * * @return Serialized diff data + * * @throws IOException IO error serializing */ byte[] getDiff() throws IOException; @@ -67,10 +69,12 @@ public interface ReplicatedMapEntry extends Serializable { /** * Applies a diff to an existing object. - * @param diff Serialized diff data + * + * @param diff Serialized diff data * @param offset Array offset * @param length Array length - * @throws IOException IO error deserializing + * + * @throws IOException IO error deserializing * @throws ClassNotFoundException Serialization error */ void applyDiff(byte[] diff, int offset, int length) throws IOException, ClassNotFoundException; @@ -91,24 +95,24 @@ public interface ReplicatedMapEntry extends Serializable { void unlock(); /** - * This method is called after the object has been - * created on a remote map. On this method, - * the object can initialize itself for any data that wasn't + * This method is called after the object has been created on a remote map. On this method, the object can + * initialize itself for any data that wasn't * * @param owner Object */ void setOwner(Object owner); /** - * For accuracy checking, a serialized attribute can contain a version number - * This number increases as modifications are made to the data. - * The replicated map can use this to ensure accuracy on a periodic basis + * For accuracy checking, a serialized attribute can contain a version number This number increases as modifications + * are made to the data. The replicated map can use this to ensure accuracy on a periodic basis + * * @return long - the version number or -1 if the data is not versioned */ long getVersion(); /** * Forces a certain version to a replicated map entry<br> + * * @param version long */ void setVersion(long version); @@ -120,12 +124,14 @@ public interface ReplicatedMapEntry extends Serializable { /** * Set the last replicate time. + * * @param lastTimeReplicated New timestamp */ void setLastTimeReplicated(long lastTimeReplicated); /** * If this returns true, to replicate that an object has been accessed + * * @return boolean */ boolean isAccessReplicate(); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org