fhanik 2004/01/11 21:23:11 Modified: modules/cluster/src/share/org/apache/catalina/cluster ClusterSession.java SessionMessage.java modules/cluster/src/share/org/apache/catalina/cluster/session DeltaManager.java DeltaSession.java ReplicatedSession.java SimpleTcpReplicationManager.java modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationValve.java Added: modules/cluster/src/share/org/apache/catalina/cluster/session DeltaRequest.java Log: Started to implement delta replication. This will allow for less data to be sent over the wire. Revision Changes Path 1.2 +67 -2 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSession.java Index: ClusterSession.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSession.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- ClusterSession.java 15 Nov 2003 00:58:20 -0000 1.1 +++ ClusterSession.java 12 Jan 2004 05:23:10 -0000 1.2 @@ -1,8 +1,73 @@ +/* + * $Header$ + * $Revision$ + * $Date$ + * + * ==================================================================== + * + * The Apache Software License, Version 1.1 + * + * Copyright (c) 1999 The Apache Software Foundation. All rights + * reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * + * 3. The end-user documentation included with the redistribution, if + * any, must include the following acknowlegement: + * "This product includes software developed by the + * Apache Software Foundation (http://www.apache.org/)." + * Alternately, this acknowlegement may appear in the software itself, + * if and wherever such third-party acknowlegements normally appear. + * + * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software + * Foundation" must not be used to endorse or promote products derived + * from this software without prior written permission. For written + * permission, please contact [EMAIL PROTECTED] + * + * 5. Products derived from this software may not be called "Apache" + * nor may "Apache" appear in their names without prior written + * permission of the Apache Group. + * + * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR + * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * <http://www.apache.org/>. + * + * [Additional notices, if required by prior licensing conditions] + * + */ + + package org.apache.catalina.cluster; import org.apache.catalina.Session; +import javax.servlet.http.HttpSession; -public interface ClusterSession extends Session { +public interface ClusterSession extends Session, HttpSession { /** * returns true if this session is the primary session, if that is the * case, the manager can expire it upon timeout. @@ -16,4 +81,4 @@ */ public void setPrimarySession(boolean primarySession); -} \ No newline at end of file +} 1.5 +8 -17 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java Index: SessionMessage.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- SessionMessage.java 16 Nov 2003 22:22:45 -0000 1.4 +++ SessionMessage.java 12 Jan 2004 05:23:10 -0000 1.5 @@ -113,15 +113,10 @@ */ public static final int EVT_SESSION_CREATED = 1; /** - * Event type used when a session has expired, but we don't - * want to notify the session listeners + * Event type used when a session has expired */ - public static final int EVT_SESSION_EXPIRED_WONOTIFY = 2; - /** - * Event type used when a session has expired, and we do - * want to notify the session listeners - */ - public static final int EVT_SESSION_EXPIRED_WNOTIFY = 7; + public static final int EVT_SESSION_EXPIRED = 2; + /** * Event type used when a session has been accessed (ie, last access time * has been updated. This is used so that the replicated sessions will not expire @@ -153,9 +148,6 @@ private int mEvtType = -1; private byte[] mSession; private String mSessionID; -// private String mAttributeName; -// private Object mAttributeValue; -// private SerializablePrincipal mPrincipal; private Member mSrc; private String mContextName; private long serializationTimestamp; @@ -235,11 +227,10 @@ switch (mEvtType) { case EVT_SESSION_CREATED : return "SESSION-MODIFIED"; - case EVT_SESSION_EXPIRED_WNOTIFY : return "SESSION-EXPIRED-WITH-NOTIFY"; - case EVT_SESSION_EXPIRED_WONOTIFY : return "SESSION-EXPIRED-WITHOUT-NOTIFY"; + case EVT_SESSION_EXPIRED : return "SESSION-EXPIRED"; case EVT_SESSION_ACCESSED : return "SESSION-ACCESSED"; case EVT_GET_ALL_SESSIONS : return "SESSION-GET-ALL"; - //case EVT_SET_USER_PRINCIPAL : return "SET-USER-PRINCIPAL"; + case EVT_SESSION_DELTA : return "SESSION-DELTA"; case EVT_ALL_SESSION_DATA : return "ALL-SESSION-DATA"; default : return "UNKNOWN-EVENT-TYPE"; } 1.2 +226 -190 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java Index: DeltaManager.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- DeltaManager.java 14 Nov 2003 20:07:52 -0000 1.1 +++ DeltaManager.java 12 Jan 2004 05:23:10 -0000 1.2 @@ -68,10 +68,8 @@ import java.beans.PropertyChangeListener; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -93,6 +91,11 @@ import org.apache.catalina.util.LifecycleSupport; import org.apache.catalina.session.ManagerBase; +import org.apache.catalina.cluster.ClusterManager; +import org.apache.catalina.cluster.SessionMessage; +import org.apache.catalina.cluster.Member; +import org.apache.catalina.cluster.tcp.SimpleTcpCluster; + /** * The DeltaManager manages replicated sessions by only * replicating the deltas in data. For applications written @@ -113,34 +116,13 @@ public class DeltaManager extends ManagerBase - implements Lifecycle, PropertyChangeListener { + implements Lifecycle, PropertyChangeListener, ClusterManager { // ---------------------------------------------------- Security Classes - private class PrivilegedDoLoad - implements PrivilegedExceptionAction { - - PrivilegedDoLoad() { - } - - public Object run() throws Exception{ - doLoad(); - return null; - } - } - private class PrivilegedDoUnload - implements PrivilegedExceptionAction { - - PrivilegedDoUnload() { - } - - public Object run() throws Exception{ - doUnload(); - return null; - } - - } + public static org.apache.commons.logging.Log log = + org.apache.commons.logging.LogFactory.getLog( DeltaManager.class ); // ----------------------------------------------------- Instance Variables @@ -148,7 +130,7 @@ /** * The descriptive information about this implementation. */ - private static final String info = "StandardManager/1.0"; + private static final String info = "DeltaManager/1.0"; /** @@ -166,7 +148,7 @@ /** * The descriptive name of this Manager implementation (for logging). */ - protected static String name = "StandardManager"; + protected static String name = "DeltaManager"; /** @@ -190,6 +172,13 @@ int expiredSessions=0; long processingTime=0; + private SimpleTcpCluster cluster = null; + private boolean stateTransferred; + // ------------------------------------------------------------- Constructor + public DeltaManager(SimpleTcpCluster cluster) { + super(); + this.cluster = cluster; + } // ------------------------------------------------------------- Properties @@ -348,6 +337,10 @@ * instantiated for any reason */ public Session createSession() { + return createSession(true); + } + + public Session createSession(boolean distribute) { if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) { @@ -384,6 +377,14 @@ session.setId(sessionId); sessionCounter++; + if ( distribute ) { + SessionMessage msg = new SessionMessage( + getName(), + SessionMessage.EVT_SESSION_CREATED, + null, + sessionId); + cluster.send(msg); + } return (session); @@ -399,35 +400,6 @@ } - /** - * Load any currently active sessions that were previously unloaded - * to the appropriate persistence mechanism, if any. If persistence is not - * supported, this method returns without doing anything. - * - * @exception ClassNotFoundException if a serialized class cannot be - * found during the reload - * @exception IOException if an input/output error occurs - */ - public void load() throws ClassNotFoundException, IOException { - if (System.getSecurityManager() != null){ - try{ - AccessController.doPrivileged( new PrivilegedDoLoad() ); - } catch (PrivilegedActionException ex){ - Exception exception = ex.getException(); - if (exception instanceof ClassNotFoundException){ - throw (ClassNotFoundException)exception; - } else if (exception instanceof IOException){ - throw (IOException)exception; - } - if (log.isDebugEnabled()) - log.debug("Unreported exception in load() " - + exception); - } - } else { - doLoad(); - } - } - /** * Load any currently active sessions that were previously unloaded @@ -438,43 +410,28 @@ * found during the reload * @exception IOException if an input/output error occurs */ - private void doLoad() throws ClassNotFoundException, IOException { - if (log.isDebugEnabled()) - log.debug("Start: Loading persisted sessions"); + private void doLoad(byte[] data) throws ClassNotFoundException, IOException { // Initialize our internal data structures - sessions.clear(); + //sessions.clear(); //should not do this // Open an input stream to the specified pathname, if any - File file = file(); - if (file == null) - return; - if (log.isDebugEnabled()) - log.debug(sm.getString("standardManager.loading", pathname)); - FileInputStream fis = null; + ByteArrayInputStream fis = null; ObjectInputStream ois = null; Loader loader = null; ClassLoader classLoader = null; try { - fis = new FileInputStream(file.getAbsolutePath()); + fis = new ByteArrayInputStream(data); BufferedInputStream bis = new BufferedInputStream(fis); if (container != null) loader = container.getLoader(); if (loader != null) classLoader = loader.getClassLoader(); if (classLoader != null) { - if (log.isDebugEnabled()) - log.debug("Creating custom object input stream for class loader "); ois = new CustomObjectInputStream(bis, classLoader); } else { - if (log.isDebugEnabled()) - log.debug("Creating standard object input stream"); ois = new ObjectInputStream(bis); } - } catch (FileNotFoundException e) { - if (log.isDebugEnabled()) - log.debug("No persisted data file found"); - return; } catch (IOException e) { log.error(sm.getString("standardManager.loading.ioe", e), e); if (ois != null) { @@ -493,14 +450,11 @@ try { Integer count = (Integer) ois.readObject(); int n = count.intValue(); - if (log.isDebugEnabled()) - log.debug("Loading " + n + " persisted sessions"); for (int i = 0; i < n; i++) { DeltaSession session = getNewDeltaSession(); session.readObjectData(ois); session.setManager(this); sessions.put(session.getId(), session); - session.activate(); } } catch (ClassNotFoundException e) { log.error(sm.getString("standardManager.loading.cnfe", e), e); @@ -533,41 +487,13 @@ // ignored } - // Delete the persistent storage file - if (file != null && file.exists() ) - file.delete(); } } - if (log.isDebugEnabled()) - log.debug("Finish: Loading persisted sessions"); } - /** - * Save any currently active sessions in the appropriate persistence - * mechanism, if any. If persistence is not supported, this method - * returns without doing anything. - * - * @exception IOException if an input/output error occurs - */ - public void unload() throws IOException { - if (System.getSecurityManager() != null){ - try{ - AccessController.doPrivileged( new PrivilegedDoUnload() ); - } catch (PrivilegedActionException ex){ - Exception exception = ex.getException(); - if (exception instanceof IOException){ - throw (IOException)exception; - } - if (log.isDebugEnabled()) - log.debug("Unreported exception in unLoad() " - + exception); - } - } else { - doUnload(); - } - } + /** @@ -577,21 +503,14 @@ * * @exception IOException if an input/output error occurs */ - private void doUnload() throws IOException { + private byte[] doUnload() throws IOException { - if (log.isDebugEnabled()) - log.debug("Unloading persisted sessions"); // Open an output stream to the specified pathname, if any - File file = file(); - if (file == null) - return; - if (log.isDebugEnabled()) - log.debug(sm.getString("standardManager.unloading", pathname)); - FileOutputStream fos = null; + ByteArrayOutputStream fos = null; ObjectOutputStream oos = null; try { - fos = new FileOutputStream(file.getAbsolutePath()); + fos = new ByteArrayOutputStream(); oos = new ObjectOutputStream(new BufferedOutputStream(fos)); } catch (IOException e) { log.error(sm.getString("standardManager.unloading.ioe", e), e); @@ -609,8 +528,6 @@ // Write the number of active sessions, followed by the details ArrayList list = new ArrayList(); synchronized (sessions) { - if (log.isDebugEnabled()) - log.debug("Unloading " + sessions.size() + " sessions"); try { oos.writeObject(new Integer(sessions.size())); Iterator elements = sessions.values().iterator(); @@ -618,9 +535,11 @@ DeltaSession session = (DeltaSession) elements.next(); list.add(session); - ((DeltaSession) session).passivate(); session.writeObjectData(oos); } + oos.flush(); + oos.close(); + oos = null; } catch (IOException e) { log.error(sm.getString("standardManager.unloading.ioe", e), e); if (oos != null) { @@ -636,38 +555,7 @@ } // Flush and close the output stream - try { - oos.flush(); - oos.close(); - oos = null; - } catch (IOException e) { - if (oos != null) { - try { - oos.close(); - } catch (IOException f) { - ; - } - oos = null; - } - throw e; - } - - // Expire all the sessions we just wrote - if (log.isDebugEnabled()) - log.debug("Expiring " + list.size() + " persisted sessions"); - Iterator expires = list.iterator(); - while (expires.hasNext()) { - DeltaSession session = (DeltaSession) expires.next(); - try { - session.expire(false); - } catch (Throwable t) { - ; - } - } - - if (log.isDebugEnabled()) - log.debug("Unloading complete"); - + return fos.toByteArray(); } @@ -737,7 +625,43 @@ // Load unloaded sessions, if any try { - load(); + //the channel is already running + log.info("Starting clustering manager...:"+getName()); + if ( cluster == null ) { + log.info("Starting... no cluster associated with this context:"+getName()); + return; + } + + if (cluster.getMembers().length > 0) { + Member mbr = cluster.getMembers()[0]; + SessionMessage msg = + new SessionMessage(this.getName(), + SessionMessage.EVT_GET_ALL_SESSIONS, + null, + "GET-ALL"); + cluster.send(msg, mbr); + log.warn("Manager["+getName()+"], requesting session state from "+mbr+ + ". This operation will timeout if no session state has been received within "+ + "60 seconds"); + long reqStart = System.currentTimeMillis(); + long reqNow = 0; + boolean isTimeout=false; + do { + try { + Thread.currentThread().sleep(100); + }catch ( Exception sleep) {} + reqNow = System.currentTimeMillis(); + isTimeout=((reqNow-reqStart)>(1000*60)); + } while ( (!getStateTransferred()) && (!isTimeout)); + if ( isTimeout || (!getStateTransferred()) ) { + log.error("Manager["+getName()+"], No session state received, timing out."); + }else { + log.info("Manager["+getName()+"], session state received in "+(reqNow-reqStart)+" ms."); + } + } else { + log.info("Manager["+getName()+"], skipping state transfer. No members active in cluster group."); + }//end if + } catch (Throwable t) { log.error(sm.getString("standardManager.managerLoad"), t); } @@ -765,13 +689,6 @@ lifecycle.fireLifecycleEvent(STOP_EVENT, null); started = false; - // Write out sessions - try { - unload(); - } catch (IOException e) { - log.error(sm.getString("standardManager.managerUnload"), e); - } - // Expire all active sessions Session sessions[] = findSessions(); for (int i = 0; i < sessions.length; i++) { @@ -822,36 +739,135 @@ } + // -------------------------------------------------------- Replication Methods - // -------------------------------------------------------- Private Methods + /** + * A message was received from another node, this + * is the callback method to implement if you are interested in + * receiving replication messages. + * @param msg - the message received. + */ + public void messageDataReceived(SessionMessage msg) { + + } + + /** + * When the request has been completed, the replication valve + * will notify the manager, and the manager will decide whether + * any replication is needed or not. + * If there is a need for replication, the manager will + * create a session message and that will be replicated. + * The cluster determines where it gets sent. + * @param sessionId - the sessionId that just completed. + * @return a SessionMessage to be sent, + */ + public SessionMessage requestCompleted(String sessionId) { + return null; + } + /** + * When the manager expires session not tied to a request. + * The cluster will periodically ask for a list of sessions + * that should expire and that should be sent across the wire. + * @return + */ + public String[] getInvalidatedSessions() { + return null; + } - /** - * Return a File object representing the pathname to our - * persistence file, if any. - */ - private File file() { - if ((pathname == null) || (pathname.length() == 0)) - return (null); - File file = new File(pathname); - if (!file.isAbsolute()) { - if (container instanceof Context) { - ServletContext servletContext = - ((Context) container).getServletContext(); - File tempdir = (File) - servletContext.getAttribute(Globals.WORK_DIR_ATTR); - if (tempdir != null) - file = new File(tempdir, pathname); - } - } -// if (!file.isAbsolute()) -// return (null); - return (file); + /** + * This method is called by the received thread when a SessionMessage has + * been received from one of the other nodes in the cluster. + * @param msg - the message received + * @param sender - the sender of the message, this is used if we receive a + * EVT_GET_ALL_SESSION message, so that we only reply to + * the requesting node + */ + protected void messageReceived(SessionMessage msg, Member sender) { + try { + log.debug("Received SessionMessage of type=" + msg.getEventTypeString()+" from "+sender); + switch (msg.getEventType()) { + case SessionMessage.EVT_GET_ALL_SESSIONS: { + //get a list of all the session from this manager + Object[] sessions = findSessions(); + java.io.ByteArrayOutputStream bout = new java.io. + ByteArrayOutputStream(); + java.io.ObjectOutputStream oout = new java.io. + ObjectOutputStream(bout); + oout.writeInt(sessions.length); + for (int i = 0; i < sessions.length; i++) { + ReplicatedSession ses = (ReplicatedSession) sessions[i]; + oout.writeUTF(ses.getId()); + byte[] data = null;//todo writeSession(ses); + oout.writeObject(data); + } //for + //don't send a message if we don't have to + oout.flush(); + oout.close(); + byte[] data = bout.toByteArray(); + SessionMessage newmsg = new SessionMessage(name, + SessionMessage.EVT_ALL_SESSION_DATA, + data, ""); + cluster.send(newmsg, sender); + break; + } + case SessionMessage.EVT_ALL_SESSION_DATA: { + java.io.ByteArrayInputStream bin = + new java.io.ByteArrayInputStream(msg.getSession()); + java.io.ObjectInputStream oin = new java.io. + ObjectInputStream(bin); + int size = oin.readInt(); + for (int i = 0; i < size; i++) { + String id = oin.readUTF(); + byte[] data = (byte[]) oin.readObject(); + Session session = null; //todo readSession(data, id); + session.setManager(this); + add(session); + } //for + stateTransferred = true; + break; + } + case SessionMessage.EVT_SESSION_CREATED: { + Session session = createSession(false); + session.setManager(this); + session.setId(msg.getSessionID()); + session.setNew(false); + break; + } + case SessionMessage.EVT_SESSION_EXPIRED: { + Session session = findSession(msg.getSessionID()); + if (session != null) { + session.expire(); + this.remove(session); + } //end if + break; + } + case SessionMessage.EVT_SESSION_ACCESSED: { + Session session = findSession(msg.getSessionID()); + if (session != null) { + session.access(); + } + break; + } + default: { + //we didn't recognize the message type, do nothing + break; + } + } //switch + } + catch (Exception x) { + log.error("Unable to receive message through TCP channel", x); + } + } - } + // -------------------------------------------------------- Private Methods + + public void backgroundProcess() { + processExpires(); + } /** * Invalidate all sessions that have expired. */ @@ -874,6 +890,26 @@ } long timeEnd = System.currentTimeMillis(); processingTime += ( timeEnd - timeNow ); + + } + public boolean getStateTransferred() { + return stateTransferred; + } + public void setStateTransferred(boolean stateTransferred) { + this.stateTransferred = stateTransferred; + } + public SimpleTcpCluster getCluster() { + return cluster; + } + public void setCluster(SimpleTcpCluster cluster) { + this.cluster = cluster; + } + + public void load() { + + } + + public void unload() { } 1.5 +8 -48 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java Index: DeltaSession.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- DeltaSession.java 18 Nov 2003 19:27:55 -0000 1.4 +++ DeltaSession.java 12 Jan 2004 05:23:10 -0000 1.5 @@ -757,50 +757,6 @@ /** - * Perform the internal processing required to passivate - * this session. - */ - public void passivate() { - - // Notify ActivationListeners - HttpSessionEvent event = null; - String keys[] = keys(); - for (int i = 0; i < keys.length; i++) { - Object attribute = getAttributeInternal(keys[i]); - if (attribute instanceof HttpSessionActivationListener) { - if (event == null) - event = new HttpSessionEvent(this); - // FIXME: Should we catch throwables? - ((HttpSessionActivationListener)attribute).sessionWillPassivate(event); - } - } - - } - - - /** - * Perform internal processing required to activate this - * session. - */ - public void activate() { - - // Notify ActivationListeners - HttpSessionEvent event = null; - String keys[] = keys(); - for (int i = 0; i < keys.length; i++) { - Object attribute = getAttributeInternal(keys[i]); - if (attribute instanceof HttpSessionActivationListener) { - if (event == null) - event = new HttpSessionEvent(this); - // FIXME: Should we catch throwables? - ((HttpSessionActivationListener)attribute).sessionDidActivate(event); - } - } - - } - - - /** * Return the object bound with the specified name to the internal notes * for this session, or <code>null</code> if no such binding exists. * @@ -1304,6 +1260,10 @@ if (value == null) { removeAttribute(name); return; + } + + if ( ! (value instanceof java.io.Serializable) ) { + throw new IllegalArgumentException("Attribute ["+name+"] is not serializable"); } // Validate our current state 1.9 +27 -4 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicatedSession.java Index: ReplicatedSession.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicatedSession.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- ReplicatedSession.java 14 Nov 2003 21:32:23 -0000 1.8 +++ ReplicatedSession.java 12 Jan 2004 05:23:10 -0000 1.9 @@ -97,11 +97,13 @@ import java.io.PrintWriter; import java.io.StringWriter; -public class ReplicatedSession extends org.apache.catalina.session.StandardSession { +public class ReplicatedSession extends org.apache.catalina.session.StandardSession +implements org.apache.catalina.cluster.ClusterSession{ private transient Manager mManager = null; protected boolean isDirty = false; private transient long lastAccessWasDistributed = System.currentTimeMillis(); + private boolean isPrimarySession=true; public ReplicatedSession(Manager manager) { super(manager); @@ -237,6 +239,27 @@ super.writeObjectData(stream); + } + + + + + + /** + * returns true if this session is the primary session, if that is the + * case, the manager can expire it upon timeout. + * @return + */ + public boolean isPrimarySession() { + return isPrimarySession; + } + + /** + * Sets whether this is the primary session or not. + * @param primarySession + */ + public void setPrimarySession(boolean primarySession) { + this.isPrimarySession=primarySession; } 1.19 +6 -16 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java Index: SimpleTcpReplicationManager.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v retrieving revision 1.18 retrieving revision 1.19 diff -u -r1.18 -r1.19 --- SimpleTcpReplicationManager.java 9 Jan 2004 23:24:09 -0000 1.18 +++ SimpleTcpReplicationManager.java 12 Jan 2004 05:23:10 -0000 1.19 @@ -211,11 +211,7 @@ super.unload(); } } - public void load() throws ClassNotFoundException, IOException { - if ( !getDistributable() ) { - super.load(); - } - } + public int getDebug() { return this.debug; @@ -334,7 +330,7 @@ synchronized ( invalidatedSessions ) { invalidatedSessions.remove(sessionId); SessionMessage msg = new SessionMessage(name, - SessionMessage.EVT_SESSION_EXPIRED_WNOTIFY, + SessionMessage.EVT_SESSION_EXPIRED, null, sessionId); return msg; @@ -568,11 +564,6 @@ try { log("Received SessionMessage of type="+msg.getEventTypeString(),3); log("Received SessionMessage sender="+sender,3); - if ( !this.getDistributable() ) { - log.warn("Received replication message, although this context["+ - getName()+"] is not distributable. Ignoring message"); - return; - } switch ( msg.getEventType() ) { case SessionMessage.EVT_GET_ALL_SESSIONS: { //get a list of all the session from this manager @@ -620,8 +611,7 @@ if ( getDebug() > 5 ) log("Received replicated session="+session); break; } - case SessionMessage.EVT_SESSION_EXPIRED_WNOTIFY: - case SessionMessage.EVT_SESSION_EXPIRED_WONOTIFY: { + case SessionMessage.EVT_SESSION_EXPIRED: { Session session = findSession(msg.getSessionID()); if ( session != null ) { session.expire(); 1.1 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java Index: DeltaRequest.java =================================================================== /* * $Header: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java,v 1.1 2004/01/12 05:23:10 fhanik Exp $ * $Revision: 1.1 $ * $Date: 2004/01/12 05:23:10 $ * * ==================================================================== * * The Apache Software License, Version 1.1 * * Copyright (c) 1999 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, if * any, must include the following acknowlegement: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowlegement may appear in the software itself, * if and wherever such third-party acknowlegements normally appear. * * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software * Foundation" must not be used to endorse or promote products derived * from this software without prior written permission. For written * permission, please contact [EMAIL PROTECTED] * * 5. Products derived from this software may not be called "Apache" * nor may "Apache" appear in their names without prior written * permission of the Apache Group. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * [Additional notices, if required by prior licensing conditions] * */ package org.apache.catalina.cluster.session; /** * This class is used to track the series of actions that happens when * a request is executed. These actions will then * @author <a href="mailto:[EMAIL PROTECTED]">Filip Hanik</a> * @version 1.0 */ import java.util.Vector; import javax.servlet.http.HttpSession; import java.io.Serializable; import java.security.Principal; import org.apache.catalina.realm.GenericPrincipal; import org.apache.catalina.cluster.ClusterSession; public class DeltaRequest implements Serializable { public static final int TYPE_ATTRIBUTE = 0; public static final int TYPE_PRINCIPAL = 1; public static final int TYPE_ISNEW = 2; public static final int TYPE_MAXINTERVAL = 3; public static final int ACTION_SET = 0; public static final int ACTION_REMOVE = 1; public static final String NAME_PRINCIPAL = "__SET__PRINCIPAL__"; public static final String NAME_MAXINTERVAL = "__SET__MAXINTERVAL__"; public static final String NAME_ISNEW = "__SET__ISNEW__"; private String sessionId; private Vector actions = new Vector(); private boolean recordAllActions = false; public DeltaRequest(String sessionId, boolean recordAllActions) { this.sessionId = sessionId; this.recordAllActions = recordAllActions; } public void setAttribute(String name, Object value) { int action = (value==null)?ACTION_REMOVE:ACTION_SET; addAction(TYPE_ATTRIBUTE,action,name,value); } public void removeAttribute(String name) { int action = ACTION_REMOVE; addAction(TYPE_ATTRIBUTE,action,name,null); } public void setMaxInactiveInterval(int interval) { int action = ACTION_SET; addAction(TYPE_MAXINTERVAL,action,NAME_MAXINTERVAL,new Integer(interval)); } public void setPrincipal(Principal p) { int action = (p==null)?ACTION_REMOVE:ACTION_SET; SerializablePrincipal sp = null; if ( p != null ) { sp = SerializablePrincipal.createPrincipal((GenericPrincipal)p); } addAction(TYPE_PRINCIPAL,action,NAME_PRINCIPAL,sp); } public void setNew(boolean n) { int action = ACTION_SET; addAction(TYPE_ISNEW,action,NAME_ISNEW,new Boolean(n)); } protected void addAction(int type, int action, String name, Object value) { AttributeInfo info = new AttributeInfo(type,action,name,value); //if we have already done something to this attribute, make sure //we don't send multiple actions across the wire if ( !recordAllActions) actions.remove(info); //add the action actions.addElement(info); } public void execute(ClusterSession session) { if ( !this.sessionId.equals( session.getId() ) ) throw new java.lang.IllegalArgumentException("Session id mismatch, not executing the delta request"); for ( int i=0; i<actions.size(); i++ ) { AttributeInfo info = (AttributeInfo)actions.get(i); switch ( info.getType() ) { case TYPE_ATTRIBUTE: { if ( info.getAction() == ACTION_SET ) session.setAttribute(info.getName(),info.getValue()); else session.removeAttribute(info.getName()); break; }//case case TYPE_ISNEW: { session.setNew(((Boolean)info.getValue()).booleanValue()); break; }//case case TYPE_MAXINTERVAL: { session.setMaxInactiveInterval(((Integer)info.getValue()).intValue()); break; }//case case TYPE_PRINCIPAL: { Principal p = null; if ( info.getAction() == ACTION_SET ) { SerializablePrincipal sp = (SerializablePrincipal)info.getValue(); p = (Principal)sp.getPrincipal(session.getManager().getContainer().getRealm()); } session.setPrincipal(p); break; }//case default : throw new java.lang.IllegalArgumentException("Invalid attribute info type="+info); }//switch }//for } public static class AttributeInfo implements java.io.Serializable { private String name = null; private Object value = null; private int action; private int type; public AttributeInfo(int type, int action, String name, Object value) { this.name = name; this.value = value; this.action = action; this.type = type; } public int getType() { return type; } public int getAction() { return action; } public Object getValue() { return value; } public int hashCode() { return name.hashCode(); } public String getName() { return name; } public boolean equals(Object o) { if ( ! (o instanceof AttributeInfo ) ) return false; AttributeInfo other = (AttributeInfo)o; return other.getName().equals(this.getName()); } } } 1.7 +15 -6 jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java Index: ReplicationValve.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- ReplicationValve.java 15 Nov 2003 00:58:20 -0000 1.6 +++ ReplicationValve.java 12 Jan 2004 05:23:11 -0000 1.7 @@ -91,6 +91,7 @@ import org.apache.catalina.cluster.session.SimpleTcpReplicationManager; import org.apache.catalina.cluster.SessionMessage; import org.apache.catalina.cluster.tcp.SimpleTcpCluster; +import org.apache.catalina.cluster.ClusterSession; /** * <p>Implementation of a Valve that logs interesting contents from the @@ -235,9 +236,17 @@ if ( debug > 4 ) log("Invoking replication request on "+uri,4); + ClusterSession cs = (ClusterSession)session; SessionMessage msg = manager.requestCompleted(id); - if ( msg == null ) return; + if ( (msg == null) && (!cs.isPrimarySession()) ) { + msg = new SessionMessage(manager.getName(), + SessionMessage.EVT_SESSION_ACCESSED, + null, + id); + } + cs.setPrimarySession(true); + if ( msg == null ) return; cluster.send(msg); long stop = System.currentTimeMillis(); @@ -255,7 +264,7 @@ */ public String toString() { - StringBuffer sb = new StringBuffer("RequestDumperValve["); + StringBuffer sb = new StringBuffer("ReplicationValve["); if (container != null) sb.append(container.getName()); sb.append("]");
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]