Author: fhanik
Date: Fri Mar 10 09:24:53 2006
New Revision: 384858
URL: http://svn.apache.org/viewcvs?rev=384858&view=rev
Log:
the map implementation is complete and ready to be tested
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384858&r1=384857&r2=384858&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
Fri Mar 10 09:24:53 2006
@@ -15,31 +15,30 @@
*/
package org.apache.catalina.tribes.tipis;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.catalina.tribes.Channel;
-import java.io.Serializable;
-import org.apache.catalina.tribes.Member;
-import java.io.UnsupportedEncodingException;
+import java.io.Externalizable;
import java.io.IOException;
-import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import java.util.Set;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
-import java.util.Collection;
+import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import java.io.Externalizable;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
+import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.mcast.McastMember;
-import java.util.Iterator;
-import org.apache.catalina.tribes.ChannelException;
-import java.util.LinkedList;
-import java.util.LinkedHashSet;
-import java.util.ArrayList;
-import java.util.Arrays;
/**
* @todo implement periodic sync/transfer
@@ -55,9 +54,10 @@
// INSTANCE VARIABLES
//------------------------------------------------------------------------------
- private Channel channel;
- private RpcChannel rpcChannel;
- private byte[] mapContextName;
+ private transient Channel channel;
+ private transient RpcChannel rpcChannel;
+ private transient byte[] mapContextName;
+ private transient boolean stateTransferred = false;
//------------------------------------------------------------------------------
@@ -109,13 +109,36 @@
this.rpcChannel = null;
this.channel = null;
super.clear();
+ this.stateTransferred = false;
}
//------------------------------------------------------------------------------
// GROUP COM INTERFACES
//------------------------------------------------------------------------------
public void transferState() {
- throw new UnsupportedOperationException();
+ try {
+ Member backup =
channel.getMembers().length>0?channel.getMembers()[0]:null;
+ if ( backup != null ) {
+ MapMessage msg = new
MapMessage(mapContextName,MapMessage.MSG_STATE,false,
+ null,null,null,null);
+ Response[] resp = rpcChannel.send(new Member[]
{backup},msg,rpcChannel.FIRST_REPLY,TIME_OUT);
+ if ( resp.length > 0 ) {
+ msg = (MapMessage)resp[0].getMessage();
+ ArrayList list = (ArrayList)msg.getValue();
+ for (int i=0; i<list.size(); i++ ) {
+ MapMessage m = (MapMessage)list.get(i);
+ MapEntry entry = new MapEntry(m.getKey(),m.getValue());
+ entry.setBackup(false);
+ entry.setProxy(true);
+ entry.setBackupNode(m.getBackupNode());
+ super.put(entry.getKey(),entry);
+ }
+ }
+ }
+ } catch ( ChannelException x ) {
+ log.error("Unable to transfer LazyReplicatedMap state.",x);
+ }
+ stateTransferred = true;
}
/**
@@ -126,12 +149,33 @@
public Serializable replyRequest(Serializable msg, Member sender) {
if ( !(msg instanceof MapMessage) ) return null;
MapMessage mapmsg = (MapMessage)msg;
- if ( mapmsg.getMsgType() != mapmsg.MSG_RETRIEVE_BACKUP ) return null;
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if ( entry == null ) return null;
- mapmsg.setValue((Serializable)entry.getValue());
- return mapmsg;
+ //backup request
+ if ( mapmsg.getMsgType() == mapmsg.MSG_RETRIEVE_BACKUP ) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if (entry == null)return null;
+ mapmsg.setValue( (Serializable) entry.getValue());
+ return mapmsg;
+ }
+
+ //state transfer request
+ if ( mapmsg.getMsgType() == mapmsg.MSG_STATE ) {
+ ArrayList list = new ArrayList();
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ MapMessage me = new
MapMessage(mapContextName,MapMessage.MSG_PROXY,
+
false,(Serializable)entry.getKey(),(Serializable)entry.getValue(),
+ null,entry.getBackupNode());
+ list.add(me);
+ }
+ mapmsg.setValue(list);
+ return mapmsg;
+ }
+
+ return null;
+
}
/**
@@ -145,8 +189,45 @@
}
public void messageReceived(Serializable msg, Member sender) {
- throw new UnsupportedOperationException();
//todo implement all the messages that we can receive
+ //messages we can receive are MSG_PROXY, MSG_BACKUP
+ if ( !(msg instanceof MapMessage) ) return;
+
+ MapMessage mapmsg = (MapMessage)msg;
+ if ( mapmsg.getMsgType() == MapMessage.MSG_PROXY ) {
+ MapEntry entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue());
+ entry.setBackup(false);
+ entry.setProxy(true);
+ entry.setBackupNode(mapmsg.getBackupNode());
+ super.put(entry.getKey(),entry);
+ }
+
+ if ( mapmsg.getMsgType() == MapMessage.MSG_BACKUP ) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if ( entry == null ) {
+ entry = new MapEntry(mapmsg.getKey(),mapmsg.getValue());
+ entry.setBackup(true);
+ entry.setProxy(false);
+ entry.setBackupNode(mapmsg.getBackupNode());
+ super.put(entry.getKey(), entry);
+ } else {
+ if ( mapmsg.isDiff() ) {
+ if ( entry.getValue() instanceof Diffable ) {
+ Diffable diff = (Diffable)entry.getValue();
+ try {
+ diff.applyDiff(mapmsg.getDiffValue(), 0,
mapmsg.getDiffValue().length);
+ }catch ( IOException x ) {
+ log.error("Unable to apply diff to
key:"+entry.getKey(),x);
+ }
+ } else {
+ log.warn("Received a DIFF replication, but
object["+entry.getValue()+"] does not implement Diffable");
+ }
+ } else {
+ entry.setValue(mapmsg.getValue());
+ }
+ }
+ }
+
}
public boolean accept(Serializable msg, Member sender) {
@@ -304,7 +385,17 @@
}
public boolean containsValue(Object value) {
- return super.containsValue(value);
+ if ( value == null ) {
+ return super.containsValue(value);
+ } else {
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if (entry.isPrimary() && value.equals(entry.getValue()))
return true;
+ }//while
+ return false;
+ }//end if
}
public Object clone() {
@@ -440,11 +531,11 @@
}
public int hashCode() {
- return key.hashCode();
+ return value.hashCode();
}
public boolean equals(Object o) {
- return key.equals(o);
+ return value.equals(o);
}
/**
@@ -486,6 +577,7 @@
public static final int MSG_RETRIEVE_BACKUP = 2;
public static final int MSG_PROXY = 3;
public static final int MSG_REMOVE = 4;
+ public static final int MSG_STATE = 5;
private byte[] mapId;
private int msgtype;
@@ -545,7 +637,8 @@
in.read(mapId);
msgtype = in.readInt();
switch (msgtype) {
- case MSG_BACKUP: {
+ case MSG_BACKUP:
+ case MSG_STATE: {
diff = in.readBoolean();
key = (Serializable)in.readObject();
if ( diff ) {
@@ -576,7 +669,8 @@
out.write(mapId);
out.writeInt(msgtype);
switch (msgtype) {
- case MSG_BACKUP: {
+ case MSG_BACKUP:
+ case MSG_STATE: {
out.writeBoolean(diff);
out.writeObject(key);
if ( diff ) {
@@ -601,6 +695,10 @@
}
}//switch
}//writeExternal
+
+ public Object clone() {
+ return new
MapMessage(this.mapId,this.msgtype,this.diff,this.key,this.value,this.diffvalue,this.node);
+ }
}//MapMessage
//------------------------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]