fhanik 2004/01/12 20:22:28 Modified: modules/cluster/src/share/org/apache/catalina/cluster/session DeltaManager.java DeltaRequest.java DeltaSession.java modules/cluster/src/share/org/apache/catalina/cluster/tcp PooledSocketSender.java ReplicationTransmitter.java SimpleTcpCluster.java Log: Fixed a bug in a dead lock with the pooled socket sender when a member crashes Recycling the delta request objects to avoid object instantiation, although I actually think this is slower Fixed the call back with the session and the delta request execution Revision Changes Path 1.5 +9 -10 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Index: DeltaManager.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- DeltaManager.java 13 Jan 2004 00:07:18 -0000 1.4 +++ DeltaManager.java 13 Jan 2004 04:22:28 -0000 1.5 @@ -406,24 +406,23 @@ } - private DeltaRequest loadDeltaRequest(byte[] data) throws + private DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data) throws ClassNotFoundException, IOException { ByteArrayInputStream fis = null; ReplicationStream ois = null; Loader loader = null; ClassLoader classLoader = null; fis = new ByteArrayInputStream(data); - BufferedInputStream bis = new BufferedInputStream(fis); ois = new ReplicationStream(fis,container.getLoader().getClassLoader()); - DeltaRequest dreq = (DeltaRequest)ois.readObject(); + session.getDeltaRequest().readExternal(ois); ois.close(); - return dreq; + return session.getDeltaRequest(); } private byte[] unloadDeltaRequest(DeltaRequest deltaRequest) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(deltaRequest); + deltaRequest.writeExternal(oos); oos.flush(); oos.close(); return bos.toByteArray(); @@ -874,8 +873,8 @@ } case SessionMessage.EVT_SESSION_DELTA : { byte[] delta = msg.getSession(); - DeltaRequest dreq = loadDeltaRequest(delta); DeltaSession session = (DeltaSession)findSession(msg.getSessionID()); + DeltaRequest dreq = loadDeltaRequest(session,delta); dreq.execute(session); session.setPrimarySession(false); 1.4 +71 -23 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java Index: DeltaRequest.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- DeltaRequest.java 13 Jan 2004 00:07:18 -0000 1.3 +++ DeltaRequest.java 13 Jan 2004 04:22:28 -0000 1.4 @@ -66,7 +66,9 @@ /** * This class is used to track the series of actions that happens when - * a request is executed. These actions will then + * a request is executed. These actions will then translate into invokations of methods + * on the actual session. + * This class is NOT thread safe. One DeltaRequest per session * @author <a href="mailto:[EMAIL PROTECTED]">Filip Hanik</a> * @version 1.0 */ @@ -95,6 +97,8 @@ private String sessionId; private LinkedList actions = new LinkedList(); + private LinkedList actionPool = new LinkedList(); + private boolean recordAllActions = false; public DeltaRequest() { @@ -140,7 +144,13 @@ int action, String name, Object value) { - AttributeInfo info = new AttributeInfo(type,action,name,value); + AttributeInfo info = null; + if ( this.actionPool.size() > 0 ) { + info = (AttributeInfo)actionPool.removeFirst(); + info.init(type,action,name,value); + } else { + info = new AttributeInfo(type, action, name, value); + } //if we have already done something to this attribute, make sure //we don't send multiple actions across the wire if ( !recordAllActions) actions.remove(info); @@ -148,25 +158,25 @@ actions.addLast(info); } - public void execute(ClusterSession session) { + public void execute(DeltaSession session) { if ( !this.sessionId.equals( session.getId() ) ) throw new java.lang.IllegalArgumentException("Session id mismatch, not executing the delta request"); for ( int i=0; i<actions.size(); i++ ) { AttributeInfo info = (AttributeInfo)actions.get(i); switch ( info.getType() ) { case TYPE_ATTRIBUTE: { - if ( info.getAction() == ACTION_SET ) - session.setAttribute(info.getName(),info.getValue()); - else - session.removeAttribute(info.getName()); + if ( info.getAction() == ACTION_SET ) { + session.setAttribute(info.getName(), info.getValue(),false); + } else + session.removeAttribute(info.getName(),true,false); break; }//case case TYPE_ISNEW: { - session.setNew(((Boolean)info.getValue()).booleanValue()); + session.setNew(((Boolean)info.getValue()).booleanValue(),false); break; }//case case TYPE_MAXINTERVAL: { - session.setMaxInactiveInterval(((Integer)info.getValue()).intValue()); + session.setMaxInactiveInterval(((Integer)info.getValue()).intValue(),false); break; }//case case TYPE_PRINCIPAL: { @@ -175,7 +185,7 @@ SerializablePrincipal sp = (SerializablePrincipal)info.getValue(); p = (Principal)sp.getPrincipal(session.getManager().getContainer().getRealm()); } - session.setPrincipal(p); + session.setPrincipal(p,false); break; }//case default : throw new java.lang.IllegalArgumentException("Invalid attribute info type="+info); @@ -184,8 +194,14 @@ } public void reset() { + while ( actions.size() > 0 ) { + AttributeInfo info = (AttributeInfo)actions.removeFirst(); + info.recycle(); + actionPool.addLast(info); + } actions.clear(); } + public String getSessionId() { return sessionId; } @@ -199,21 +215,29 @@ return actions.size(); } - public void readExternal(java.io.ObjectInput in ) throws java.io.IOException, - java.lang.ClassNotFoundException { - //sessionId - String - //recordAll - boolean - //size - int - //AttributeInfo - in an array + public void readExternal(java.io.ObjectInput in) throws java.io.IOException, + java.lang.ClassNotFoundException { + //sessionId - String + //recordAll - boolean + //size - int + //AttributeInfo - in an array + reset(); sessionId = in.readUTF(); recordAllActions = in.readBoolean(); int cnt = in.readInt(); - if ( actions == null ) + if (actions == null) actions = new LinkedList(); else actions.clear(); for (int i = 0; i < cnt; i++) { - AttributeInfo info = (AttributeInfo)in.readObject(); + AttributeInfo info = null; + if (this.actionPool.size() > 0) { + info = (AttributeInfo) actionPool.removeFirst(); + } + else { + info = new AttributeInfo(-1,-1,null,null); + } + info.readExternal(in); actions.addLast(info); }//for } @@ -230,7 +254,7 @@ out.writeInt(getSize()); for ( int i=0; i<getSize(); i++ ) { AttributeInfo info = (AttributeInfo)actions.get(i); - out.writeObject(info); + info.writeExternal(out); } } @@ -239,11 +263,21 @@ private Object value = null; private int action; private int type; + public AttributeInfo() {} + public AttributeInfo(int type, int action, String name, Object value) { + super(); + init(type,action,name,value); + } + + public void init(int type, + int action, + String name, + Object value) { this.name = name; this.value = value; this.action = action; @@ -268,6 +302,13 @@ public String getName() { return name; } + + public void recycle() { + name = null; + value = null; + type=-1; + action=-1; + } public boolean equals(Object o) { if ( ! (o instanceof AttributeInfo ) ) return false; @@ -298,7 +339,14 @@ out.writeUTF(getName()); out.writeObject(getValue()); } - + + public String toString() { + StringBuffer buf = new StringBuffer("AttributeInfo[type="); + buf.append(getType()).append(", action=").append(getAction()); + buf.append(", name=").append(getName()).append(", value=").append(getValue()); + buf.append(", addr=").append(super.toString()).append("]"); + return buf.toString(); + } } 1.7 +30 -16 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java Index: DeltaSession.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- DeltaSession.java 12 Jan 2004 07:50:06 -0000 1.6 +++ DeltaSession.java 13 Jan 2004 04:22:28 -0000 1.7 @@ -535,12 +535,15 @@ * @param interval The new maximum interval */ public void setMaxInactiveInterval(int interval) { + setMaxInactiveInterval(interval,true); + } + public void setMaxInactiveInterval(int interval, boolean addDeltaRequest) { this.maxInactiveInterval = interval; if (isValid && interval == 0) { expire(); } else { - deltaRequest.setMaxInactiveInterval(interval); + if ( addDeltaRequest ) deltaRequest.setMaxInactiveInterval(interval); } } @@ -552,8 +555,11 @@ * @param isNew The new value for the <code>isNew</code> flag */ public void setNew(boolean isNew) { + setNew(isNew,true); + } + public void setNew(boolean isNew, boolean addDeltaRequest) { this.isNew = isNew; - deltaRequest.setNew(isNew); + if (addDeltaRequest) deltaRequest.setNew(isNew); } @@ -580,11 +586,13 @@ * @param principal The new Principal, or <code>null</code> if none */ public void setPrincipal(Principal principal) { - + setPrincipal(principal,true); + } + public void setPrincipal(Principal principal,boolean addDeltaRequest) { Principal oldPrincipal = this.principal; this.principal = principal; support.firePropertyChange("principal", oldPrincipal, this.principal); - deltaRequest.setPrincipal(principal); + if (addDeltaRequest) deltaRequest.setPrincipal(principal); } @@ -919,6 +927,11 @@ deltaRequest.setSessionId(getId()); } } + + public DeltaRequest getDeltaRequest() { + if ( deltaRequest == null ) resetDeltaRequest(); + return deltaRequest; + } // ------------------------------------------------- HttpSession Properties @@ -1162,6 +1175,10 @@ * invalidated session */ public void removeAttribute(String name, boolean notify) { + removeAttribute(name,notify,true); + } + + public void removeAttribute(String name, boolean notify, boolean addDeltaRequest) { // Validate our current state if (!isValid()) @@ -1181,7 +1198,7 @@ } } - deltaRequest.removeAttribute(name); + if (addDeltaRequest) deltaRequest.removeAttribute(name); // Do we need to do valueUnbound() and attributeRemoved() notification? if (!notify) { @@ -1271,6 +1288,9 @@ * invalidated session */ public void setAttribute(String name, Object value) { + setAttribute(name,value,true); + } + public void setAttribute(String name, Object value, boolean addDeltaRequest) { // Name cannot be null if (name == null) @@ -1287,7 +1307,7 @@ throw new IllegalArgumentException("Attribute ["+name+"] is not serializable"); } - deltaRequest.setAttribute(name,value); + if (addDeltaRequest) deltaRequest.setAttribute(name,value); // Validate our current state if (!isValid()) @@ -1603,12 +1623,6 @@ } } - - - public DeltaRequest getDeltaRequest() { - return deltaRequest; - } - } 1.2 +16 -5 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Index: PooledSocketSender.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- PooledSocketSender.java 9 Jan 2004 23:24:09 -0000 1.1 +++ PooledSocketSender.java 13 Jan 2004 04:22:28 -0000 1.2 @@ -117,6 +117,7 @@ public void connect() throws java.io.IOException { //do nothing, happens in the socket sender itself + senderQueue.open(); } public void disconnect() @@ -194,6 +195,7 @@ private LinkedList queue = new LinkedList(); private LinkedList inuse = new LinkedList(); private Object mutex = new Object(); + private boolean isOpen = true; public SenderQueue(PooledSocketSender parent, int limit) { this.limit = limit; @@ -206,7 +208,7 @@ long delta = 0; do { synchronized (mutex) { - + if ( !isOpen ) throw new IllegalStateException("Socket pool is closed."); if ( queue.size() > 0 ) { sender = (SocketSender) queue.removeFirst(); } else if ( inuse.size() < limit ) { @@ -223,7 +225,7 @@ } }//synchronized delta = System.currentTimeMillis() - start; - } while ( (sender == null) && (timeout==0?true:(delta<timeout)) ); + } while ( (isOpen) && (sender == null) && (timeout==0?true:(delta<timeout)) ); //to do return sender; } @@ -259,6 +261,15 @@ }//for queue.clear(); inuse.clear(); + isOpen = false; + mutex.notifyAll(); + } + } + + public void open() { + synchronized (mutex) { + isOpen = true; + mutex.notifyAll(); } } } 1.12 +4 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Index: ReplicationTransmitter.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v retrieving revision 1.11 retrieving revision 1.12 diff -u -r1.11 -r1.12 --- ReplicationTransmitter.java 20 Dec 2003 00:48:52 -0000 1.11 +++ ReplicationTransmitter.java 13 Jan 2004 04:22:28 -0000 1.12 @@ -101,7 +101,7 @@ String key = addr.getHostAddress()+":"+port; IDataSender sender = (IDataSender)map.get(key); if ( sender == null ) return; - if ( sender.isConnected() ) sender.disconnect(); + sender.disconnect(); map.remove(key); } 1.26 +8 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java Index: SimpleTcpCluster.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v retrieving revision 1.25 retrieving revision 1.26 diff -u -r1.25 -r1.26 --- SimpleTcpCluster.java 13 Jan 2004 00:07:18 -0000 1.25 +++ SimpleTcpCluster.java 13 Jan 2004 04:22:28 -0000 1.26 @@ -699,8 +699,10 @@ ClusterManager mgr = (ClusterManager) managers.get(key); if (mgr != null) mgr.messageDataReceived(msg); - else - log.warn("Context manager doesn't exist:" + key); + else { + //this happens a lot before the system has started up + log.debug("Context manager doesn't exist:" + key); + } }//while } else { ClusterManager mgr = (ClusterManager) managers.get(name);
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]