Adds a threadsafe latch to the DeltaManager which is used to block
processing of cluster messages until local applications have
completed initialization.
Includes changes to the DeltaManager to create the latch based on
configuration, changes to Catalina to automatically open the latch
when initialization is complete, a constant used to as the
attribute key in the ServletContext and an modification to the
mbean-descriptor for the new DeltaManager attribute.
Also includes a handful of style cleanups interspersed (spelling,
removing compiler warnings about type checking, removing spaces
before semi colons and blank lines before and after braces)
throughout.
- Jason
"That's the problem. He's a brilliant lunatic and you can't tell
which way he'll jump --
like his game he's impossible to analyse --
you can't dissect him, predict him --
which of course means he's not a lunatic at all."
Index: catalina/Globals.java
===================================================================
--- catalina/Globals.java (revision 719433)
+++ catalina/Globals.java (working copy)
@@ -36,6 +36,14 @@
"org.apache.catalina.deploy.alt_dd";
/**
+ * The servlet context attribute under which we store the
concurrent latch
+ * used to block processing cluster messages until after local
application
+ * initialization
+ */
+ public static final String CLUSTER_DELAY =
+ "org.apache.catalina.ha.delay";
+
+ /**
* The request attribute under which we store the array of
X509Certificate
* objects representing the certificate chain presented by our
client,
* if any.
Index: catalina/ha/session/DeltaManager.java
===================================================================
--- catalina/ha/session/DeltaManager.java (revision 719433)
+++ catalina/ha/session/DeltaManager.java (working copy)
@@ -26,11 +26,15 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+
+import javax.servlet.ServletContext;
import org.apache.catalina.Cluster;
import org.apache.catalina.Container;
import org.apache.catalina.Context;
import org.apache.catalina.Engine;
+import org.apache.catalina.Globals;
import org.apache.catalina.Host;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleListener;
@@ -38,13 +42,13 @@
import org.apache.catalina.Valve;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.ha.CatalinaCluster;
+import org.apache.catalina.ha.ClusterManager;
import org.apache.catalina.ha.ClusterMessage;
import org.apache.catalina.ha.tcp.ReplicationValve;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.io.ReplicationStream;
import org.apache.catalina.util.LifecycleSupport;
import org.apache.catalina.util.StringManager;
-import org.apache.catalina.ha.ClusterManager;
/**
* The DeltaManager manages replicated sessions by only
replicating the deltas
@@ -62,10 +66,11 @@
* @author Craig R. McClanahan
* @author Jean-Francois Arcand
* @author Peter Rossbach
+ * @author Jason Lunn
* @version $Revision$ $Date$
*/
-public class DeltaManager extends ClusterManagerBase{
+public class DeltaManager extends ClusterManagerBase {
// ----------------------------------------------------
Security Classes
public static org.apache.juli.logging.Log log =
org.apache.juli.logging.LogFactory.getLog(DeltaManager.class);
@@ -106,6 +111,18 @@
protected LifecycleSupport lifecycle = new LifecycleSupport
(this);
/**
+ * Flag indicating that messageReceived(ClusterMessage) should
block until
+ * all local applications have completed initialization
+ */
+ protected boolean delay = false;
+
+ /**
+ * Barrier used to receive notification from other threads
that it is okay
+ * to process incoming messages from the cluster
+ */
+ private CountDownLatch gate = null;
+
+ /**
* The maximum number of active Sessions allowed, or -1 for no
limit.
*/
private int maxActiveSessions = -1;
@@ -121,8 +138,8 @@
/**
* wait time between send session block (default 2 sec)
*/
- private int sendAllSessionsWaitTime = 2 * 1000 ;
- private ArrayList receivedMessageQueue = new ArrayList() ;
+ private int sendAllSessionsWaitTime = 2 * 1000;
+ private ArrayList<SessionMessage> receivedMessageQueue = new
ArrayList<SessionMessage>() ;
private boolean receiverQueue = false ;
private boolean stateTimestampDrop = true ;
private long stateTransferCreateSendTime;
@@ -175,6 +192,27 @@
public String getName() {
return name;
}
+
+ /**
+ * Set the member that indicates processing messages should
wait for local
+ * initialization of applications to complete
+ *
+ * @param delayed If true, processing messageReceived
(ClusterMessage) will
+ * block until local web applications have completed
initialization.
+ */
+ public void setDelay ( boolean delay ) {
+ this.delay = delay;
+ }
+
+ /**
+ * Gets the member the delayed flag
+ * @return delayed - boolean flag indicating that
+ * messageReceived(ClusterMessage) should block until local
initialization
+ * of applications has completed
+ */
+ public boolean getDelay () {
+ return delay;
+ }
/**
* @return Returns the counterSend_EVT_GET_ALL_SESSIONS.
@@ -781,6 +819,24 @@
lifecycle.removeLifecycleListener(listener);
}
+ @Override
+ /**
+ * If this.delay is true, create a gate that will be opened when
+ * local application initialization is complete
+ */
+ public void init () {
+ if (delay && container != null) {
+ if (container instanceof StandardContext) {
+ ServletContext servletContext = ((StandardContext)
container).getServletContext();
+ if (servletContext != null) {
+ gate = new CountDownLatch(1);
+ servletContext.setAttribute
(Globals.CLUSTER_DELAY, gate);
+ }
+ }
+ }
+ super.init();
+ }
+
/**
* Prepare for the beginning of active use of the public
methods of this
* component. This method should be called after
<code>configure()</code>,
@@ -885,8 +941,8 @@
waitForSendAllSessions(beforeSendTime);
} finally {
synchronized(receivedMessageQueue) {
- for (Iterator iter =
receivedMessageQueue.iterator(); iter.hasNext();) {
- SessionMessage smsg = (SessionMessage)
iter.next();
+ for (Iterator<SessionMessage> iter =
receivedMessageQueue.iterator(); iter.hasNext();) {
+ SessionMessage smsg = iter.next();
if (!stateTimestampDrop) {
messageReceived(smsg, smsg.getAddress
() != null ? (Member) smsg.getAddress() : null);
} else {
@@ -1068,6 +1124,18 @@
* the message received.
*/
public void messageDataReceived(ClusterMessage cmsg) {
+ // Block processing until local application initialization
has
+ // completed, if a gate has been erected
+ if(gate != null) {
+ try {
+ gate.await();
+ gate = null;
+ }
+ catch(InterruptedException e) {
+ log.error(e, e);
+ }
+ }
+
if (cmsg != null && cmsg instanceof SessionMessage) {
SessionMessage msg = (SessionMessage) cmsg;
switch (msg.getEventType()) {
@@ -1535,7 +1603,8 @@
result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ;
result.receiverQueue = receiverQueue ;
result.stateTimestampDrop = stateTimestampDrop ;
- result.stateTransferCreateSendTime =
stateTransferCreateSendTime;
+ result.stateTransferCreateSendTime =
stateTransferCreateSendTime;
+ result.delay = delay;
return result;
}
}
Index: catalina/ha/session/mbeans-descriptors.xml
===================================================================
--- catalina/ha/session/mbeans-descriptors.xml (revision 719433)
+++ catalina/ha/session/mbeans-descriptors.xml (working copy)
@@ -268,7 +268,7 @@
<attribute
name="expireSessionsOnShutdown"
is="true"
- description="exipre all sessions cluster wide as one node
goes down"
+ description="expire all sessions cluster wide as one node
goes down"
type="boolean"/>
<attribute
name="notifyListenersOnReplication"
@@ -293,6 +293,10 @@
name="sendAllSessionsWaitTime"
description="wait time between send session block (default 2
sec)"
type="int"/>
+ <attribute
+ name="delay"
+ description="wait until local applications have initialized
before processing cluster messages"
+ type="boolean"/>
<operation
name="listSessionIds"
description="Return the list of active session ids"
Index: catalina/startup/Catalina.java
===================================================================
--- catalina/startup/Catalina.java (revision 719433)
+++ catalina/startup/Catalina.java (working copy)
@@ -28,11 +28,18 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.servlet.ServletContext;
import org.apache.catalina.Container;
+import org.apache.catalina.Globals;
import org.apache.catalina.Lifecycle;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.Server;
+import org.apache.catalina.Service;
+import org.apache.catalina.core.ContainerBase;
+import org.apache.catalina.core.StandardContext;
import org.apache.catalina.core.StandardServer;
import org.apache.tomcat.util.digester.Digester;
import org.apache.tomcat.util.digester.Rule;
@@ -56,6 +63,7 @@
*
* @author Craig R. McClanahan
* @author Remy Maucherat
+ * @author Jason Lunn
* @version $Revision$ $Date$
*/
@@ -579,7 +587,18 @@
log.error("Catalina.start: ", e);
}
}
-
+
+ // Open any gates stored in ServletContexts to allow the
processing of
+ // cluster messages once local applications have been
initialized
+ Service [] services = server.findServices();
+ if ( services != null ) {
+ for ( Service service : services ) {
+ if ( service != null ) {
+ openContainerGates( service.getContainer() );
+ }
+ }
+ }
+
long t2 = System.nanoTime();
if(log.isInfoEnabled())
log.info("Server startup in " + ((t2 - t1) / 1000000)
+ " ms");
@@ -601,7 +620,6 @@
await();
stop();
}
-
}
@@ -609,7 +627,6 @@
* Stop an existing server instance.
*/
public void stop() {
-
try {
// Remove the ShutdownHook first so that server.stop()
// doesn't get invoked twice
@@ -629,7 +646,6 @@
log.error("Catalina.stop", e);
}
}
-
}
@@ -637,9 +653,7 @@
* Await and shutdown.
*/
public void await() {
-
server.await();
-
}
@@ -647,15 +661,50 @@
* Print usage information for this application.
*/
protected void usage() {
-
System.out.println
("usage: java org.apache.catalina.startup.Catalina"
+ " [ -config {pathname} ]"
+ " [ -nonaming ] { start | stop }");
+ }
+
+ /**
+ * Traverses the argument container and decrements the
CountDownLatch
+ * found in the servlet context with attribute key
Globals.CLUSTER_DELAY
+ * if found
+ * @param container Possibly null Container instance
+ */
+ protected void openContainerGates ( Container container ) {
+ if (container == null) {
+ return;
+ }
+
+ if ( container instanceof StandardContext ) {
+ StandardContext context =
+ (StandardContext)container;
+ ServletContext servletContext =
+ context.getServletContext();
+ if (servletContext != null) {
+ Object contextAttribute = servletContext
+ .getAttribute(Globals.CLUSTER_DELAY);
+ if (contextAttribute != null &&
+ contextAttribute instanceof CountDownLatch) {
+ CountDownLatch gate =
+ (CountDownLatch) contextAttribute;
+ gate.countDown();
+ }
+ }
+ } else if ( container instanceof ContainerBase ) {
+ ContainerBase base = (ContainerBase)container;
+ Container [] containers = base.findChildren();
+ if ( containers != null ) {
+ for ( Container childContainer : containers ) {
+ openContainerGates( childContainer );
+ }
+ }
+ }
}
-
// ---------------------------------------
CatalinaShutdownHook Inner Class
// XXX Should be moved to embedded !
@@ -709,6 +758,4 @@
top.setParentClassLoader(parentClassLoader);
}
-
-
}
On Fri, Nov 21, 2008 at 10:42 PM, Peter Rossbach <[EMAIL PROTECTED]>
wrote:
Hi Jason,
send us your implementation and let us review your stuff :-)
You can also register a ContextListener at DeltaManager.setContainer
() to control your latch.
Are your sure that session sync message (GET ALL Session) is
received before first request at second node
is processed?
I think your feature is an extension of the current reveivedQueue
usage!
Regards
Peter
Am 20.11.2008 um 22:54 schrieb Jason:
This message is targeted at Filip Hanik, Craig R. McClanahan, Jean-
Francois
Arcand, Peter Rossbach or anyone with a direct interest in the
DeltaManager
implementation in Tomcat 6.
A vendor (who will remain nameless) whose product I support for a
client
recently gave me an idea for a patch to DeltaManager to address
what the
vendor claims is a Tomcat specific issue related to session
replication. I'm
wondering if it would be of value to the community or if the
"problem" it is
trying to remedy is an intentional "feature".
The primary issue is that, according to vendor engineering support,
the
other application containers the vendor supports deploying their
product on,
including WebSphere, WebLogic, et al, wait until after local
applications
have been initialized before processing incoming messages from the
cluster
that could include deserializing remote sessions and the objects
therein. I
have not confirmed this by examining the other containers mind you,
but am
pretty confident that this is an accurate statement in so far that
vendor's
product works in those environments but does not work in a
clustered tomcat
environment.
The reason it fails in tomcat is that some of the objects in the
serialized
session make calls at construction time to the vendor's (archaic)
preferences API's static methods, which are not initialized
properly until
the web application itself is started. The result is that the first
node in
the cluster starts up fine, but the 2nd-Nth nodes die a horrible death
trying to deserialize remote sessions populated by the first node.
The workaround we've implemented locally is a simple one: we extend
the
DeltaManager with a custom class. Therein, we create a latch
(java.util.concurrent.CountDownLatch, to be specific) and save it
in the
ServletContext. The only overridden method is messageDataReceived
(), which
uses the latch.await() method to block before calling the original
implementation of the parent messageDataReceived() method.
The vendor's application (or, more properly, the custom extensions
we've
built on their platform) looks at the ServletContext for a latch
after the
preferences have been initialized locally, and calls latch.countDown
(),
allowing any blocked calls to messageDataReceived() to start
executing as
normally.
Without breaking the current sequence of initializing the session
replication code before local applications that Tomcat developers
may have
come to expect, it seems like there is a potential solution here
that might
enable applications like the one I've got to support to choose to
configure
the session replication to wait to process incoming messages until
after the
application has started.
I think it would be pretty trivial for me to offer a patch to
DeltaManager
that created a latch based on a configuration element. One could
imagine an
automatic mechanism for toggling the latch by the container after the
application initialization, or deferring to the application to
deactivate.
The question is, does anybody want such functionality besides me? The
corollary is, if being able to choose when session replication
begins is a
desirable feature, is this the right tactic to implement it?
Sincerely,
- Jason Lunn
"That's the problem. He's a brilliant lunatic and you can't tell
which way he'll jump --
like his game he's impossible to analyse --
you can't dissect him, predict him --
which of course means he's not a lunatic at all."
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]