fhanik      2004/01/11 21:23:11

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster
                        ClusterSession.java SessionMessage.java
               modules/cluster/src/share/org/apache/catalina/cluster/session
                        DeltaManager.java DeltaSession.java
                        ReplicatedSession.java
                        SimpleTcpReplicationManager.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        ReplicationValve.java
  Added:       modules/cluster/src/share/org/apache/catalina/cluster/session
                        DeltaRequest.java
  Log:
  Started to implement delta replication. This will allow for less data to be sent 
over the wire.
  
  Revision  Changes    Path
  1.2       +67 -2     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSession.java
  
  Index: ClusterSession.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/ClusterSession.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ClusterSession.java       15 Nov 2003 00:58:20 -0000      1.1
  +++ ClusterSession.java       12 Jan 2004 05:23:10 -0000      1.2
  @@ -1,8 +1,73 @@
  +/*
  + * $Header$
  + * $Revision$
  + * $Date$
  + *
  + * ====================================================================
  + *
  + * The Apache Software License, Version 1.1
  + *
  + * Copyright (c) 1999 The Apache Software Foundation.  All rights
  + * reserved.
  + *
  + * Redistribution and use in source and binary forms, with or without
  + * modification, are permitted provided that the following conditions
  + * are met:
  + *
  + * 1. Redistributions of source code must retain the above copyright
  + *    notice, this list of conditions and the following disclaimer.
  + *
  + * 2. Redistributions in binary form must reproduce the above copyright
  + *    notice, this list of conditions and the following disclaimer in
  + *    the documentation and/or other materials provided with the
  + *    distribution.
  + *
  + * 3. The end-user documentation included with the redistribution, if
  + *    any, must include the following acknowlegement:
  + *       "This product includes software developed by the
  + *        Apache Software Foundation (http://www.apache.org/)."
  + *    Alternately, this acknowlegement may appear in the software itself,
  + *    if and wherever such third-party acknowlegements normally appear.
  + *
  + * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
  + *    Foundation" must not be used to endorse or promote products derived
  + *    from this software without prior written permission. For written
  + *    permission, please contact [EMAIL PROTECTED]
  + *
  + * 5. Products derived from this software may not be called "Apache"
  + *    nor may "Apache" appear in their names without prior written
  + *    permission of the Apache Group.
  + *
  + * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
  + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  + * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
  + * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
  + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
  + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
  + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  + * SUCH DAMAGE.
  + * ====================================================================
  + *
  + * This software consists of voluntary contributions made by many
  + * individuals on behalf of the Apache Software Foundation.  For more
  + * information on the Apache Software Foundation, please see
  + * <http://www.apache.org/>.
  + *
  + * [Additional notices, if required by prior licensing conditions]
  + *
  + */
  +
  +
   package org.apache.catalina.cluster;
   
   import org.apache.catalina.Session;
  +import javax.servlet.http.HttpSession;
   
  -public interface ClusterSession extends Session {
  +public interface ClusterSession extends Session, HttpSession {
      /**
       * returns true if this session is the primary session, if that is the
       * case, the manager can expire it upon timeout.
  @@ -16,4 +81,4 @@
       */
      public void setPrimarySession(boolean primarySession);
   
  -}
  \ No newline at end of file
  +}
  
  
  
  1.5       +8 -17     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java
  
  Index: SessionMessage.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/SessionMessage.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- SessionMessage.java       16 Nov 2003 22:22:45 -0000      1.4
  +++ SessionMessage.java       12 Jan 2004 05:23:10 -0000      1.5
  @@ -113,15 +113,10 @@
        */
       public static final int EVT_SESSION_CREATED = 1;
       /**
  -     * Event type used when a session has expired, but we don't
  -     * want to notify the session listeners
  +     * Event type used when a session has expired
        */
  -    public static final int EVT_SESSION_EXPIRED_WONOTIFY = 2;
  -    /**
  -     * Event type used when a session has expired, and we do
  -     * want to notify the session listeners
  -     */
  -    public static final int EVT_SESSION_EXPIRED_WNOTIFY = 7;
  +    public static final int EVT_SESSION_EXPIRED = 2;
  +
       /**
        * Event type used when a session has been accessed (ie, last access time
        * has been updated. This is used so that the replicated sessions will not 
expire
  @@ -153,9 +148,6 @@
       private int mEvtType = -1;
       private byte[] mSession;
       private String mSessionID;
  -//    private String mAttributeName;
  -//    private Object mAttributeValue;
  -//    private SerializablePrincipal mPrincipal;
       private Member mSrc;
       private String mContextName;
       private long serializationTimestamp;
  @@ -235,11 +227,10 @@
           switch (mEvtType)
           {
               case EVT_SESSION_CREATED : return "SESSION-MODIFIED";
  -            case EVT_SESSION_EXPIRED_WNOTIFY : return "SESSION-EXPIRED-WITH-NOTIFY";
  -            case EVT_SESSION_EXPIRED_WONOTIFY : return 
"SESSION-EXPIRED-WITHOUT-NOTIFY";
  +            case EVT_SESSION_EXPIRED : return "SESSION-EXPIRED";
               case EVT_SESSION_ACCESSED : return "SESSION-ACCESSED";
               case EVT_GET_ALL_SESSIONS : return "SESSION-GET-ALL";
  -            //case EVT_SET_USER_PRINCIPAL : return "SET-USER-PRINCIPAL";
  +            case EVT_SESSION_DELTA : return "SESSION-DELTA";
               case EVT_ALL_SESSION_DATA : return "ALL-SESSION-DATA";
               default : return "UNKNOWN-EVENT-TYPE";
           }
  
  
  
  1.2       +226 -190  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
  
  Index: DeltaManager.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- DeltaManager.java 14 Nov 2003 20:07:52 -0000      1.1
  +++ DeltaManager.java 12 Jan 2004 05:23:10 -0000      1.2
  @@ -68,10 +68,8 @@
   import java.beans.PropertyChangeListener;
   import java.io.BufferedInputStream;
   import java.io.BufferedOutputStream;
  -import java.io.File;
  -import java.io.FileInputStream;
  -import java.io.FileNotFoundException;
  -import java.io.FileOutputStream;
  +import java.io.ByteArrayInputStream;
  +import java.io.ByteArrayOutputStream;
   import java.io.IOException;
   import java.io.ObjectInputStream;
   import java.io.ObjectOutputStream;
  @@ -93,6 +91,11 @@
   import org.apache.catalina.util.LifecycleSupport;
   
   import org.apache.catalina.session.ManagerBase;
  +import org.apache.catalina.cluster.ClusterManager;
  +import org.apache.catalina.cluster.SessionMessage;
  +import org.apache.catalina.cluster.Member;
  +import org.apache.catalina.cluster.tcp.SimpleTcpCluster;
  +
   /**
    * The DeltaManager manages replicated sessions by only
    * replicating the deltas in data. For applications written
  @@ -113,34 +116,13 @@
   
   public class DeltaManager
       extends ManagerBase
  -    implements Lifecycle, PropertyChangeListener {
  +    implements Lifecycle, PropertyChangeListener, ClusterManager {
   
       // ---------------------------------------------------- Security Classes
  -    private class PrivilegedDoLoad
  -        implements PrivilegedExceptionAction {
  -
  -        PrivilegedDoLoad() {
  -        }
  -
  -        public Object run() throws Exception{
  -           doLoad();
  -           return null;
  -        }
  -    }
   
  -    private class PrivilegedDoUnload
  -        implements PrivilegedExceptionAction {
  -
  -        PrivilegedDoUnload() {
  -        }
  -
  -        public Object run() throws Exception{
  -            doUnload();
  -            return null;
  -        }
  -
  -    }
   
  +    public static org.apache.commons.logging.Log log =
  +        org.apache.commons.logging.LogFactory.getLog( DeltaManager.class );
   
       // ----------------------------------------------------- Instance Variables
   
  @@ -148,7 +130,7 @@
       /**
        * The descriptive information about this implementation.
        */
  -    private static final String info = "StandardManager/1.0";
  +    private static final String info = "DeltaManager/1.0";
   
   
       /**
  @@ -166,7 +148,7 @@
       /**
        * The descriptive name of this Manager implementation (for logging).
        */
  -    protected static String name = "StandardManager";
  +    protected static String name = "DeltaManager";
   
   
       /**
  @@ -190,6 +172,13 @@
       int expiredSessions=0;
       long processingTime=0;
   
  +    private SimpleTcpCluster cluster = null;
  +    private boolean stateTransferred;
  +    // ------------------------------------------------------------- Constructor
  +    public DeltaManager(SimpleTcpCluster cluster) {
  +        super();
  +        this.cluster = cluster;
  +    }
   
       // ------------------------------------------------------------- Properties
   
  @@ -348,6 +337,10 @@
        *  instantiated for any reason
        */
       public Session createSession() {
  +        return createSession(true);
  +    }
  +
  +    public Session createSession(boolean distribute) {
   
         if ((maxActiveSessions >= 0) &&
             (sessions.size() >= maxActiveSessions)) {
  @@ -384,6 +377,14 @@
   
         session.setId(sessionId);
         sessionCounter++;
  +      if ( distribute ) {
  +          SessionMessage msg = new SessionMessage(
  +              getName(),
  +              SessionMessage.EVT_SESSION_CREATED,
  +              null,
  +              sessionId);
  +          cluster.send(msg);
  +      }
   
         return (session);
   
  @@ -399,35 +400,6 @@
       }
   
   
  -    /**
  -     * Load any currently active sessions that were previously unloaded
  -     * to the appropriate persistence mechanism, if any.  If persistence is not
  -     * supported, this method returns without doing anything.
  -     *
  -     * @exception ClassNotFoundException if a serialized class cannot be
  -     *  found during the reload
  -     * @exception IOException if an input/output error occurs
  -     */
  -    public void load() throws ClassNotFoundException, IOException {
  -        if (System.getSecurityManager() != null){
  -            try{
  -                AccessController.doPrivileged( new PrivilegedDoLoad() );
  -            } catch (PrivilegedActionException ex){
  -                Exception exception = ex.getException();
  -                if (exception instanceof ClassNotFoundException){
  -                    throw (ClassNotFoundException)exception;
  -                } else if (exception instanceof IOException){
  -                    throw (IOException)exception;
  -                }
  -                if (log.isDebugEnabled())
  -                    log.debug("Unreported exception in load() "
  -                        + exception);
  -            }
  -        } else {
  -            doLoad();
  -        }
  -    }
  -
   
       /**
        * Load any currently active sessions that were previously unloaded
  @@ -438,43 +410,28 @@
        *  found during the reload
        * @exception IOException if an input/output error occurs
        */
  -    private void doLoad() throws ClassNotFoundException, IOException {
  -        if (log.isDebugEnabled())
  -            log.debug("Start: Loading persisted sessions");
  +    private void doLoad(byte[] data) throws ClassNotFoundException, IOException {
   
           // Initialize our internal data structures
  -        sessions.clear();
  +        //sessions.clear(); //should not do this
   
           // Open an input stream to the specified pathname, if any
  -        File file = file();
  -        if (file == null)
  -            return;
  -        if (log.isDebugEnabled())
  -            log.debug(sm.getString("standardManager.loading", pathname));
  -        FileInputStream fis = null;
  +        ByteArrayInputStream fis = null;
           ObjectInputStream ois = null;
           Loader loader = null;
           ClassLoader classLoader = null;
           try {
  -            fis = new FileInputStream(file.getAbsolutePath());
  +            fis = new ByteArrayInputStream(data);
               BufferedInputStream bis = new BufferedInputStream(fis);
               if (container != null)
                   loader = container.getLoader();
               if (loader != null)
                   classLoader = loader.getClassLoader();
               if (classLoader != null) {
  -                if (log.isDebugEnabled())
  -                    log.debug("Creating custom object input stream for class loader 
");
                   ois = new CustomObjectInputStream(bis, classLoader);
               } else {
  -                if (log.isDebugEnabled())
  -                    log.debug("Creating standard object input stream");
                   ois = new ObjectInputStream(bis);
               }
  -        } catch (FileNotFoundException e) {
  -            if (log.isDebugEnabled())
  -                log.debug("No persisted data file found");
  -            return;
           } catch (IOException e) {
               log.error(sm.getString("standardManager.loading.ioe", e), e);
               if (ois != null) {
  @@ -493,14 +450,11 @@
               try {
                   Integer count = (Integer) ois.readObject();
                   int n = count.intValue();
  -                if (log.isDebugEnabled())
  -                    log.debug("Loading " + n + " persisted sessions");
                   for (int i = 0; i < n; i++) {
                       DeltaSession session = getNewDeltaSession();
                       session.readObjectData(ois);
                       session.setManager(this);
                       sessions.put(session.getId(), session);
  -                    session.activate();
                   }
               } catch (ClassNotFoundException e) {
                 log.error(sm.getString("standardManager.loading.cnfe", e), e);
  @@ -533,41 +487,13 @@
                       // ignored
                   }
   
  -                // Delete the persistent storage file
  -                if (file != null && file.exists() )
  -                    file.delete();
               }
           }
   
  -        if (log.isDebugEnabled())
  -            log.debug("Finish: Loading persisted sessions");
       }
   
   
  -    /**
  -     * Save any currently active sessions in the appropriate persistence
  -     * mechanism, if any.  If persistence is not supported, this method
  -     * returns without doing anything.
  -     *
  -     * @exception IOException if an input/output error occurs
  -     */
  -    public void unload() throws IOException {
  -        if (System.getSecurityManager() != null){
  -            try{
  -                AccessController.doPrivileged( new PrivilegedDoUnload() );
  -            } catch (PrivilegedActionException ex){
  -                Exception exception = ex.getException();
  -                if (exception instanceof IOException){
  -                    throw (IOException)exception;
  -                }
  -                if (log.isDebugEnabled())
  -                    log.debug("Unreported exception in unLoad() "
  -                        + exception);
  -            }
  -        } else {
  -            doUnload();
  -        }
  -    }
  +
   
   
       /**
  @@ -577,21 +503,14 @@
        *
        * @exception IOException if an input/output error occurs
        */
  -    private void doUnload() throws IOException {
  +    private byte[] doUnload() throws IOException {
   
  -        if (log.isDebugEnabled())
  -            log.debug("Unloading persisted sessions");
   
           // Open an output stream to the specified pathname, if any
  -        File file = file();
  -        if (file == null)
  -            return;
  -        if (log.isDebugEnabled())
  -            log.debug(sm.getString("standardManager.unloading", pathname));
  -        FileOutputStream fos = null;
  +        ByteArrayOutputStream fos = null;
           ObjectOutputStream oos = null;
           try {
  -            fos = new FileOutputStream(file.getAbsolutePath());
  +            fos = new ByteArrayOutputStream();
               oos = new ObjectOutputStream(new BufferedOutputStream(fos));
           } catch (IOException e) {
               log.error(sm.getString("standardManager.unloading.ioe", e), e);
  @@ -609,8 +528,6 @@
           // Write the number of active sessions, followed by the details
           ArrayList list = new ArrayList();
           synchronized (sessions) {
  -            if (log.isDebugEnabled())
  -                log.debug("Unloading " + sessions.size() + " sessions");
               try {
                   oos.writeObject(new Integer(sessions.size()));
                   Iterator elements = sessions.values().iterator();
  @@ -618,9 +535,11 @@
                       DeltaSession session =
                           (DeltaSession) elements.next();
                       list.add(session);
  -                    ((DeltaSession) session).passivate();
                       session.writeObjectData(oos);
                   }
  +                oos.flush();
  +                oos.close();
  +                oos = null;
               } catch (IOException e) {
                   log.error(sm.getString("standardManager.unloading.ioe", e), e);
                   if (oos != null) {
  @@ -636,38 +555,7 @@
           }
   
           // Flush and close the output stream
  -        try {
  -            oos.flush();
  -            oos.close();
  -            oos = null;
  -        } catch (IOException e) {
  -            if (oos != null) {
  -                try {
  -                    oos.close();
  -                } catch (IOException f) {
  -                    ;
  -                }
  -                oos = null;
  -            }
  -            throw e;
  -        }
  -
  -        // Expire all the sessions we just wrote
  -        if (log.isDebugEnabled())
  -            log.debug("Expiring " + list.size() + " persisted sessions");
  -        Iterator expires = list.iterator();
  -        while (expires.hasNext()) {
  -            DeltaSession session = (DeltaSession) expires.next();
  -            try {
  -                session.expire(false);
  -            } catch (Throwable t) {
  -                ;
  -            }
  -        }
  -
  -        if (log.isDebugEnabled())
  -            log.debug("Unloading complete");
  -
  +        return fos.toByteArray();
       }
   
   
  @@ -737,7 +625,43 @@
   
           // Load unloaded sessions, if any
           try {
  -            load();
  +            //the channel is already running
  +            log.info("Starting clustering manager...:"+getName());
  +            if ( cluster == null ) {
  +                log.info("Starting... no cluster associated with this 
context:"+getName());
  +                return;
  +            }
  +
  +            if (cluster.getMembers().length > 0) {
  +                Member mbr = cluster.getMembers()[0];
  +                SessionMessage msg =
  +                    new SessionMessage(this.getName(),
  +                                       SessionMessage.EVT_GET_ALL_SESSIONS,
  +                                       null,
  +                                       "GET-ALL");
  +                cluster.send(msg, mbr);
  +                log.warn("Manager["+getName()+"], requesting session state from 
"+mbr+
  +                         ". This operation will timeout if no session state has 
been received within "+
  +                         "60 seconds");
  +                long reqStart = System.currentTimeMillis();
  +                long reqNow = 0;
  +                boolean isTimeout=false;
  +                do {
  +                    try {
  +                        Thread.currentThread().sleep(100);
  +                    }catch ( Exception sleep) {}
  +                    reqNow = System.currentTimeMillis();
  +                    isTimeout=((reqNow-reqStart)>(1000*60));
  +                } while ( (!getStateTransferred()) && (!isTimeout));
  +                if ( isTimeout || (!getStateTransferred()) ) {
  +                    log.error("Manager["+getName()+"], No session state received, 
timing out.");
  +                }else {
  +                    log.info("Manager["+getName()+"], session state received in 
"+(reqNow-reqStart)+" ms.");
  +                }
  +            } else {
  +                log.info("Manager["+getName()+"], skipping state transfer. No 
members active in cluster group.");
  +            }//end if
  +
           } catch (Throwable t) {
               log.error(sm.getString("standardManager.managerLoad"), t);
           }
  @@ -765,13 +689,6 @@
           lifecycle.fireLifecycleEvent(STOP_EVENT, null);
           started = false;
   
  -        // Write out sessions
  -        try {
  -            unload();
  -        } catch (IOException e) {
  -            log.error(sm.getString("standardManager.managerUnload"), e);
  -        }
  -
           // Expire all active sessions
           Session sessions[] = findSessions();
           for (int i = 0; i < sessions.length; i++) {
  @@ -822,36 +739,135 @@
   
       }
   
  +    // -------------------------------------------------------- Replication Methods
   
  -    // -------------------------------------------------------- Private Methods
  +    /**
  +        * A message was received from another node, this
  +        * is the callback method to implement if you are interested in
  +        * receiving replication messages.
  +        * @param msg - the message received.
  +        */
  +       public void messageDataReceived(SessionMessage msg) {
  +
  +       }
  +
  +       /**
  +        * When the request has been completed, the replication valve
  +        * will notify the manager, and the manager will decide whether
  +        * any replication is needed or not.
  +        * If there is a need for replication, the manager will
  +        * create a session message and that will be replicated.
  +        * The cluster determines where it gets sent.
  +        * @param sessionId - the sessionId that just completed.
  +        * @return a SessionMessage to be sent,
  +        */
  +       public SessionMessage requestCompleted(String sessionId) {
  +           return null;
  +       }
   
  +       /**
  +        * When the manager expires session not tied to a request.
  +        * The cluster will periodically ask for a list of sessions
  +        * that should expire and that should be sent across the wire.
  +        * @return
  +        */
  +       public String[] getInvalidatedSessions() {
  +           return null;
  +       }
   
  -    /**
  -     * Return a File object representing the pathname to our
  -     * persistence file, if any.
  -     */
  -    private File file() {
   
  -        if ((pathname == null) || (pathname.length() == 0))
  -            return (null);
  -        File file = new File(pathname);
  -        if (!file.isAbsolute()) {
  -            if (container instanceof Context) {
  -                ServletContext servletContext =
  -                    ((Context) container).getServletContext();
  -                File tempdir = (File)
  -                    servletContext.getAttribute(Globals.WORK_DIR_ATTR);
  -                if (tempdir != null)
  -                    file = new File(tempdir, pathname);
  -            }
  -        }
  -//        if (!file.isAbsolute())
  -//            return (null);
  -        return (file);
  +       /**
  +        * This method is called by the received thread when a SessionMessage has
  +        * been received from one of the other nodes in the cluster.
  +        * @param msg - the message received
  +        * @param sender - the sender of the message, this is used if we receive a
  +        *                 EVT_GET_ALL_SESSION message, so that we only reply to
  +        *                 the requesting node
  +        */
  +       protected void messageReceived(SessionMessage msg, Member sender) {
  +           try {
  +               log.debug("Received SessionMessage of type=" + 
msg.getEventTypeString()+" from "+sender);
  +               switch (msg.getEventType()) {
  +                   case SessionMessage.EVT_GET_ALL_SESSIONS: {
  +                       //get a list of all the session from this manager
  +                       Object[] sessions = findSessions();
  +                       java.io.ByteArrayOutputStream bout = new java.io.
  +                           ByteArrayOutputStream();
  +                       java.io.ObjectOutputStream oout = new java.io.
  +                           ObjectOutputStream(bout);
  +                       oout.writeInt(sessions.length);
  +                       for (int i = 0; i < sessions.length; i++) {
  +                           ReplicatedSession ses = (ReplicatedSession) sessions[i];
  +                           oout.writeUTF(ses.getId());
  +                           byte[] data = null;//todo writeSession(ses);
  +                           oout.writeObject(data);
  +                       } //for
  +                       //don't send a message if we don't have to
  +                       oout.flush();
  +                       oout.close();
  +                       byte[] data = bout.toByteArray();
  +                       SessionMessage newmsg = new SessionMessage(name,
  +                           SessionMessage.EVT_ALL_SESSION_DATA,
  +                           data, "");
  +                       cluster.send(newmsg, sender);
  +                       break;
  +                   }
  +                   case SessionMessage.EVT_ALL_SESSION_DATA: {
  +                       java.io.ByteArrayInputStream bin =
  +                           new java.io.ByteArrayInputStream(msg.getSession());
  +                       java.io.ObjectInputStream oin = new java.io.
  +                           ObjectInputStream(bin);
  +                       int size = oin.readInt();
  +                       for (int i = 0; i < size; i++) {
  +                           String id = oin.readUTF();
  +                           byte[] data = (byte[]) oin.readObject();
  +                           Session session = null; //todo readSession(data, id);
  +                           session.setManager(this);
  +                           add(session);
  +                       } //for
  +                       stateTransferred = true;
  +                       break;
  +                   }
  +                   case SessionMessage.EVT_SESSION_CREATED: {
  +                       Session session = createSession(false);
  +                       session.setManager(this);
  +                       session.setId(msg.getSessionID());
  +                       session.setNew(false);
  +                       break;
  +                   }
  +                   case SessionMessage.EVT_SESSION_EXPIRED: {
  +                       Session session = findSession(msg.getSessionID());
  +                       if (session != null) {
  +                           session.expire();
  +                           this.remove(session);
  +                       } //end if
  +                       break;
  +                   }
  +                   case SessionMessage.EVT_SESSION_ACCESSED: {
  +                       Session session = findSession(msg.getSessionID());
  +                       if (session != null) {
  +                           session.access();
  +                       }
  +                       break;
  +                   }
  +                   default: {
  +                       //we didn't recognize the message type, do nothing
  +                       break;
  +                   }
  +               } //switch
  +           }
  +           catch (Exception x) {
  +               log.error("Unable to receive message through TCP channel", x);
  +           }
  +       }
   
  -    }
   
   
  +    // -------------------------------------------------------- Private Methods
  +
  +    public void backgroundProcess() {
  +        processExpires();
  +    }
       /**
        * Invalidate all sessions that have expired.
        */
  @@ -874,6 +890,26 @@
           }
           long timeEnd = System.currentTimeMillis();
           processingTime += ( timeEnd - timeNow );
  +
  +    }
  +    public boolean getStateTransferred() {
  +        return stateTransferred;
  +    }
  +    public void setStateTransferred(boolean stateTransferred) {
  +        this.stateTransferred = stateTransferred;
  +    }
  +    public SimpleTcpCluster getCluster() {
  +        return cluster;
  +    }
  +    public void setCluster(SimpleTcpCluster cluster) {
  +        this.cluster = cluster;
  +    }
  +
  +    public void load() {
  +
  +    }
  +
  +    public void unload() {
   
       }
   
  
  
  
  1.5       +8 -48     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java
  
  Index: DeltaSession.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- DeltaSession.java 18 Nov 2003 19:27:55 -0000      1.4
  +++ DeltaSession.java 12 Jan 2004 05:23:10 -0000      1.5
  @@ -757,50 +757,6 @@
   
   
       /**
  -     * Perform the internal processing required to passivate
  -     * this session.
  -     */
  -    public void passivate() {
  -
  -        // Notify ActivationListeners
  -        HttpSessionEvent event = null;
  -        String keys[] = keys();
  -        for (int i = 0; i < keys.length; i++) {
  -            Object attribute = getAttributeInternal(keys[i]);
  -            if (attribute instanceof HttpSessionActivationListener) {
  -                if (event == null)
  -                    event = new HttpSessionEvent(this);
  -                // FIXME: Should we catch throwables?
  -                
((HttpSessionActivationListener)attribute).sessionWillPassivate(event);
  -            }
  -        }
  -
  -    }
  -
  -
  -    /**
  -     * Perform internal processing required to activate this
  -     * session.
  -     */
  -    public void activate() {
  -
  -        // Notify ActivationListeners
  -        HttpSessionEvent event = null;
  -        String keys[] = keys();
  -        for (int i = 0; i < keys.length; i++) {
  -            Object attribute = getAttributeInternal(keys[i]);
  -            if (attribute instanceof HttpSessionActivationListener) {
  -                if (event == null)
  -                    event = new HttpSessionEvent(this);
  -                // FIXME: Should we catch throwables?
  -                
((HttpSessionActivationListener)attribute).sessionDidActivate(event);
  -            }
  -        }
  -
  -    }
  -
  -
  -    /**
        * Return the object bound with the specified name to the internal notes
        * for this session, or <code>null</code> if no such binding exists.
        *
  @@ -1304,6 +1260,10 @@
           if (value == null) {
               removeAttribute(name);
               return;
  +        }
  +
  +        if ( ! (value instanceof java.io.Serializable) ) {
  +            throw new IllegalArgumentException("Attribute ["+name+"] is not 
serializable");
           }
   
           // Validate our current state
  
  
  
  1.9       +27 -4     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicatedSession.java
  
  Index: ReplicatedSession.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/ReplicatedSession.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- ReplicatedSession.java    14 Nov 2003 21:32:23 -0000      1.8
  +++ ReplicatedSession.java    12 Jan 2004 05:23:10 -0000      1.9
  @@ -97,11 +97,13 @@
   import java.io.PrintWriter;
   import java.io.StringWriter;
   
  -public class ReplicatedSession extends org.apache.catalina.session.StandardSession {
  +public class ReplicatedSession extends org.apache.catalina.session.StandardSession
  +implements org.apache.catalina.cluster.ClusterSession{
   
       private transient Manager mManager = null;
       protected boolean isDirty = false;
       private transient long lastAccessWasDistributed = System.currentTimeMillis();
  +    private boolean isPrimarySession=true;
   
       public ReplicatedSession(Manager manager) {
           super(manager);
  @@ -237,6 +239,27 @@
   
           super.writeObjectData(stream);
   
  +    }
  +
  +
  +
  +
  +
  +    /**
  +     * returns true if this session is the primary session, if that is the
  +     * case, the manager can expire it upon timeout.
  +     * @return
  +     */
  +    public boolean isPrimarySession() {
  +        return isPrimarySession;
  +    }
  +
  +    /**
  +     * Sets whether this is the primary session or not.
  +     * @param primarySession
  +     */
  +    public void setPrimarySession(boolean primarySession) {
  +        this.isPrimarySession=primarySession;
       }
   
   
  
  
  
  1.19      +6 -16     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java
  
  Index: SimpleTcpReplicationManager.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/SimpleTcpReplicationManager.java,v
  retrieving revision 1.18
  retrieving revision 1.19
  diff -u -r1.18 -r1.19
  --- SimpleTcpReplicationManager.java  9 Jan 2004 23:24:09 -0000       1.18
  +++ SimpleTcpReplicationManager.java  12 Jan 2004 05:23:10 -0000      1.19
  @@ -211,11 +211,7 @@
               super.unload();
           }
       }
  -    public void load() throws ClassNotFoundException, IOException {
  -        if ( !getDistributable() ) {
  -            super.load();
  -        }
  -    }
  +
       public int getDebug()
       {
           return this.debug;
  @@ -334,7 +330,7 @@
                   synchronized ( invalidatedSessions ) {
                       invalidatedSessions.remove(sessionId);
                       SessionMessage msg = new SessionMessage(name,
  -                    SessionMessage.EVT_SESSION_EXPIRED_WNOTIFY,
  +                    SessionMessage.EVT_SESSION_EXPIRED,
                       null,
                       sessionId);
                   return msg;
  @@ -568,11 +564,6 @@
           try  {
               log("Received SessionMessage of type="+msg.getEventTypeString(),3);
               log("Received SessionMessage sender="+sender,3);
  -            if (  !this.getDistributable() ) {
  -                log.warn("Received replication message, although this context["+
  -                         getName()+"] is not distributable. Ignoring message");
  -                return;
  -            }
               switch ( msg.getEventType() ) {
                   case SessionMessage.EVT_GET_ALL_SESSIONS: {
                       //get a list of all the session from this manager
  @@ -620,8 +611,7 @@
                       if ( getDebug()  > 5 ) log("Received replicated 
session="+session);
                       break;
                   }
  -                case SessionMessage.EVT_SESSION_EXPIRED_WNOTIFY:
  -                case SessionMessage.EVT_SESSION_EXPIRED_WONOTIFY: {
  +                case SessionMessage.EVT_SESSION_EXPIRED: {
                       Session session = findSession(msg.getSessionID());
                       if ( session != null ) {
                           session.expire();
  
  
  
  1.1                  
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java
  
  Index: DeltaRequest.java
  ===================================================================
  /*
   * $Header: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java,v
 1.1 2004/01/12 05:23:10 fhanik Exp $
   * $Revision: 1.1 $
   * $Date: 2004/01/12 05:23:10 $
   *
   * ====================================================================
   *
   * The Apache Software License, Version 1.1
   *
   * Copyright (c) 1999 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution, if
   *    any, must include the following acknowlegement:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowlegement may appear in the software itself,
   *    if and wherever such third-party acknowlegements normally appear.
   *
   * 4. The names "The Jakarta Project", "Tomcat", and "Apache Software
   *    Foundation" must not be used to endorse or promote products derived
   *    from this software without prior written permission. For written
   *    permission, please contact [EMAIL PROTECTED]
   *
   * 5. Products derived from this software may not be called "Apache"
   *    nor may "Apache" appear in their names without prior written
   *    permission of the Apache Group.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   *
   * [Additional notices, if required by prior licensing conditions]
   *
   */
  
  
  package org.apache.catalina.cluster.session;
  
  /**
   * This class is used to track the series of actions that happens when
   * a request is executed. These actions will then
   * @author <a href="mailto:[EMAIL PROTECTED]">Filip Hanik</a>
   * @version 1.0
   */
  
  import java.util.Vector;
  import javax.servlet.http.HttpSession;
  import java.io.Serializable;
  import java.security.Principal;
  import org.apache.catalina.realm.GenericPrincipal;
  import org.apache.catalina.cluster.ClusterSession;
  
  
  public class DeltaRequest implements Serializable {
  
      public static final int TYPE_ATTRIBUTE = 0;
      public static final int TYPE_PRINCIPAL = 1;
      public static final int TYPE_ISNEW = 2;
      public static final int TYPE_MAXINTERVAL = 3;
  
      public static final int ACTION_SET = 0;
      public static final int ACTION_REMOVE = 1;
  
      public static final String NAME_PRINCIPAL = "__SET__PRINCIPAL__";
      public static final String NAME_MAXINTERVAL = "__SET__MAXINTERVAL__";
      public static final String NAME_ISNEW = "__SET__ISNEW__";
  
      private String sessionId;
      private Vector actions = new Vector();
      private boolean recordAllActions = false;
  
      public DeltaRequest(String sessionId, boolean recordAllActions) {
          this.sessionId = sessionId;
          this.recordAllActions = recordAllActions;
      }
  
  
      public void setAttribute(String name, Object value) {
          int action = (value==null)?ACTION_REMOVE:ACTION_SET;
          addAction(TYPE_ATTRIBUTE,action,name,value);
      }
  
      public void removeAttribute(String name) {
          int action = ACTION_REMOVE;
          addAction(TYPE_ATTRIBUTE,action,name,null);
      }
  
      public void setMaxInactiveInterval(int interval) {
          int action = ACTION_SET;
          addAction(TYPE_MAXINTERVAL,action,NAME_MAXINTERVAL,new Integer(interval));
      }
  
      public void setPrincipal(Principal p) {
          int action = (p==null)?ACTION_REMOVE:ACTION_SET;
          SerializablePrincipal sp = null;
          if ( p != null ) {
              sp = SerializablePrincipal.createPrincipal((GenericPrincipal)p);
          }
          addAction(TYPE_PRINCIPAL,action,NAME_PRINCIPAL,sp);
      }
  
      public void setNew(boolean n) {
          int action = ACTION_SET;
          addAction(TYPE_ISNEW,action,NAME_ISNEW,new Boolean(n));
      }
  
      protected void addAction(int type,
                               int action,
                               String name,
                               Object value) {
          AttributeInfo info = new AttributeInfo(type,action,name,value);
          //if we have already done something to this attribute, make sure
          //we don't send multiple actions across the wire
          if ( !recordAllActions) actions.remove(info);
          //add the action
          actions.addElement(info);
      }
  
      public void execute(ClusterSession session) {
          if ( !this.sessionId.equals( session.getId() ) )
              throw new java.lang.IllegalArgumentException("Session id mismatch, not 
executing the delta request");
          for ( int i=0; i<actions.size(); i++ ) {
              AttributeInfo info = (AttributeInfo)actions.get(i);
              switch ( info.getType() ) {
                  case TYPE_ATTRIBUTE: {
                      if ( info.getAction() == ACTION_SET )
                          session.setAttribute(info.getName(),info.getValue());
                      else
                          session.removeAttribute(info.getName());
                      break;
                  }//case
                  case TYPE_ISNEW: {
                      session.setNew(((Boolean)info.getValue()).booleanValue());
                      break;
                  }//case
                  case TYPE_MAXINTERVAL: {
                      
session.setMaxInactiveInterval(((Integer)info.getValue()).intValue());
                      break;
                  }//case
                  case TYPE_PRINCIPAL: {
                      Principal p = null;
                      if ( info.getAction() == ACTION_SET ) {
                          SerializablePrincipal sp = 
(SerializablePrincipal)info.getValue();
                          p = 
(Principal)sp.getPrincipal(session.getManager().getContainer().getRealm());
                      }
                      session.setPrincipal(p);
                      break;
                  }//case
                  default : throw new java.lang.IllegalArgumentException("Invalid 
attribute info type="+info);
              }//switch
          }//for
      }
  
  
      public static class AttributeInfo implements java.io.Serializable {
          private String name = null;
          private Object value = null;
          private int action;
          private int type;
  
          public AttributeInfo(int type,
                               int action,
                               String name,
                               Object value) {
              this.name = name;
              this.value = value;
              this.action = action;
              this.type = type;
          }
  
          public int getType() {
              return type;
          }
  
          public int getAction() {
              return action;
          }
  
          public Object getValue() {
              return value;
          }
          public int hashCode() {
              return name.hashCode();
          }
  
          public String getName() {
              return name;
          }
  
          public boolean equals(Object o) {
              if ( ! (o instanceof AttributeInfo ) ) return false;
              AttributeInfo other =  (AttributeInfo)o;
              return other.getName().equals(this.getName());
          }
  
      }
  
  }
  
  
  
  1.7       +15 -6     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java
  
  Index: ReplicationValve.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationValve.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- ReplicationValve.java     15 Nov 2003 00:58:20 -0000      1.6
  +++ ReplicationValve.java     12 Jan 2004 05:23:11 -0000      1.7
  @@ -91,6 +91,7 @@
   import org.apache.catalina.cluster.session.SimpleTcpReplicationManager;
   import org.apache.catalina.cluster.SessionMessage;
   import org.apache.catalina.cluster.tcp.SimpleTcpCluster;
  +import org.apache.catalina.cluster.ClusterSession;
   
   /**
    * <p>Implementation of a Valve that logs interesting contents from the
  @@ -235,9 +236,17 @@
   
               if ( debug > 4 ) log("Invoking replication request on "+uri,4);
   
  +            ClusterSession cs = (ClusterSession)session;
               SessionMessage msg = manager.requestCompleted(id);
  -            if ( msg == null ) return;
  +            if ( (msg == null) && (!cs.isPrimarySession()) ) {
  +                msg = new SessionMessage(manager.getName(),
  +                                         SessionMessage.EVT_SESSION_ACCESSED,
  +                                         null,
  +                                         id);
  +            }
  +            cs.setPrimarySession(true);
   
  +            if ( msg == null ) return;
   
               cluster.send(msg);
               long stop = System.currentTimeMillis();
  @@ -255,7 +264,7 @@
        */
       public String toString() {
   
  -        StringBuffer sb = new StringBuffer("RequestDumperValve[");
  +        StringBuffer sb = new StringBuffer("ReplicationValve[");
           if (container != null)
               sb.append(container.getName());
           sb.append("]");
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to