User: simone
Date: 00/12/12 01:35:16
Added: src/main/org/jboss/ejb/plugins AbstractInstanceCache.java
Log:
Replaces old EnterpriseInstanceCache to stick to a common naming convention.
Many changes related to proper synchronization between the instance interceptor and
the passivation.
Improved logging of the passivation.
Added (remarked) useful debug code.
Extensively commented the source code.
Revision Changes Path
1.1 jboss/src/main/org/jboss/ejb/plugins/AbstractInstanceCache.java
Index: AbstractInstanceCache.java
===================================================================
/*
* JBoss, the OpenSource EJB server
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ejb.plugins;
import java.lang.reflect.Constructor;
import java.rmi.RemoteException;
import java.rmi.NoSuchObjectException;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import javax.ejb.EJBException;
import org.w3c.dom.Element;
import org.jboss.util.CachePolicy;
import org.jboss.util.Executable;
import org.jboss.util.WorkerQueue;
import org.jboss.util.Sync;
import org.jboss.util.Semaphore;
import org.jboss.ejb.InstanceCache;
import org.jboss.ejb.EnterpriseContext;
import org.jboss.ejb.DeploymentException;
import org.jboss.ejb.Container;
import org.jboss.metadata.MetaData;
import org.jboss.metadata.XmlLoadable;
import org.jboss.logging.Logger;
/**
* Base class for caches of entity and stateful beans. <p>
* It manages the cache entries through a {@link CachePolicy} object;
* the implementation of the cache policy object must respect the following
* requirements:
* <ul>
* <li> Have a public constructor that takes a single argument of type
* AbstractInstanceCache.class or a subclass
* </ul>
*
* @author Simone Bordet ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public abstract class AbstractInstanceCache
implements InstanceCache, XmlLoadable
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
/* The object that is delegated to implement the desired caching policy */
private CachePolicy m_cache;
/* The worker queue that passivates beans in another thread */
private WorkerQueue m_passivator;
/* The mutex object for the cache */
private Object m_cacheLock = new Object();
/* Helper class that handles synchronization for the passivation thread */
private PassivationHelper m_passivationHelper;
/* Map that holds mutexes used to sync the passivation with other activities */
private Map m_lockMap = new HashMap();
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
/* From InstanceCache interface */
public EnterpriseContext get(Object id)
throws RemoteException, NoSuchObjectException
{
if (id == null) throw new IllegalArgumentException("Can't get an
object with a null key");
EnterpriseContext ctx = null;
synchronized (getCacheLock())
{
ctx = (EnterpriseContext)getCache().get(id);
if (ctx == null)
{
// Here I block if the bean is passivating now
ctx = unschedulePassivation(id);
// Already passivated ?
if (ctx == null)
{
try
{
ctx = acquireContext();
setKey(id, ctx);
activate(ctx);
insert(ctx);
// Logger is very time expensive. Turn
on only for debug
// Logger.debug("Activated bean " +
getContainer().getBeanMetaData().getEjbName() + " with id = " + id);
}
catch (Exception x)
{
freeContext(ctx);
throw new
NoSuchObjectException(x.getMessage());
}
}
else
{
insert(ctx);
}
}
}
if (ctx == null) throw new NoSuchObjectException("Can't find bean with
id = " + id);
return ctx;
}
/* From InstanceCache interface */
public void insert(EnterpriseContext ctx)
{
if (ctx == null) throw new IllegalArgumentException("Can't insert a
null object in the cache");
CachePolicy cache = getCache();
synchronized (getCacheLock())
{
// This call must be inside the sync block, otherwise can
happen that I get the
// key, then the context is passivated and I will insert in
cache a meaningless
// context.
Object key = getKey(ctx);
if (cache.peek(key) == null)
{
cache.insert(key, ctx);
}
else
{
// Here it is a bug.
// Check for all places where insert is called, and
ensure that they cannot
// run without having acquired the cache lock via
getCacheLock()
throw new IllegalStateException("INSERTING AN ALREADY
EXISTING BEAN, ID = " + key);
}
}
}
/* From InstanceCache interface */
public void release(EnterpriseContext ctx)
{
if (ctx == null) throw new IllegalArgumentException("Can't release a
null object");
// Here I remove the bean; call to remove(id) is wrong
// cause will remove also the cache lock that is needed
// by the passivation, that eventually will remove it.
synchronized (getCacheLock())
{
Object id = getKey(ctx);
if (getCache().peek(id) != null)
{
getCache().remove(id);
}
// This call, executed anyway, leaves door open to multiple
scheduling
// of the same context, which I take care in other places, in
// PassivationHelper.schedule. I'm not sure that moving the
call below
// just after getCache().remove above would not lead to other
// problems, so I leave it here.
schedulePassivation(ctx);
}
}
/* From InstanceCache interface */
public void remove(Object id)
{
if (id == null) throw new IllegalArgumentException("Can't remove an
object using a null key");
synchronized (getCacheLock())
{
if (getCache().peek(id) != null)
{
getCache().remove(id);
}
}
removeLock(id);
}
public boolean isActive(Object id)
{
// Check whether an object with the given id is available in the cache
return getCache().peek(id) != null;
}
/**
* Creates (if necessary) and returns an object used as mutex to sync
passivation
* activity with other activities, in particular incoming invoking calls. <br>
* The mutex is automatically removed when the corrispondent id is removed from
* the cache, either by passivation or by removal.
* This method must be synchronized with its dual, {@link #removeLock}.
*/
public synchronized Sync getLock(Object id)
{
Sync mutex = (Sync)m_lockMap.get(id);
if (mutex == null)
{
mutex = new Semaphore(1);
m_lockMap.put(id, mutex);
}
return mutex;
}
/**
* Removes the mutex associated with the given id.
* This method must be synchronized with its dual, {@link #getLock}.
*/
protected synchronized void removeLock(Object id)
{
Object mutex = m_lockMap.get(id);
if (mutex != null)
{
m_lockMap.remove(id);
}
}
// XmlLoadable implementation ----------------------------------------------
public void importXml(Element element) throws DeploymentException
{
// This one is mandatory
String p = MetaData.getElementContent(MetaData.getUniqueChild(element,
"cache-policy"));
try
{
Class cls =
Thread.currentThread().getContextClassLoader().loadClass(p);
Constructor ctor = cls.getConstructor(new Class[]
{AbstractInstanceCache.class});
m_cache = (CachePolicy)ctor.newInstance(new Object[] {this});
}
catch (Exception x)
{
throw new DeploymentException("Can't create cache policy", x);
}
Element policyConf = MetaData.getOptionalChild(element,
"cache-policy-conf");
if (policyConf != null)
{
if (m_cache instanceof XmlLoadable)
{
try
{
((XmlLoadable)m_cache).importXml(policyConf);
}
catch (Exception x)
{
throw new DeploymentException("Can't import
policy configuration", x);
}
}
}
}
/* From Service interface*/
public void init() throws Exception
{
getCache().init();
m_passivationHelper = new PassivationHelper();
String threadName = "Passivator Thread for " +
getContainer().getBeanMetaData().getEjbName();
ClassLoader cl = getContainer().getClassLoader();
m_passivator = new PassivatorQueue(threadName, cl);
}
/* From Service interface*/
public void start() throws Exception
{
getCache().start();
m_passivator.start();
}
/* From Service interface*/
public void stop()
{
// Empty the cache
synchronized (getCacheLock())
{
getCache().stop();
}
m_passivator.stop();
}
/* From Service interface*/
public void destroy()
{
getCache().destroy();
}
// Y overrides ---------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
/**
* Schedules the given EnterpriseContext for passivation
* @see PassivationHelper#schedule
*/
protected void schedulePassivation(EnterpriseContext ctx)
{
m_passivationHelper.schedule(ctx);
}
/**
* Tries to unschedule the given EnterpriseContext for passivation; returns
* the unscheduled context if it wasn't passivated yet, null if the
* passivation already happened.
* @see PassivationHelper#unschedule
*/
protected EnterpriseContext unschedulePassivation(Object id)
{
return m_passivationHelper.unschedule(id);
}
/**
* Returns the container for this cache.
*/
protected abstract Container getContainer();
/**
* Returns the cache policy used for this cache.
*/
protected CachePolicy getCache() {return m_cache;}
/**
* Returns the mutex used to sync access to the cache policy object
*/
public Object getCacheLock()
{
return m_cacheLock;
}
/**
* Passivates the given EnterpriseContext
*/
protected abstract void passivate(EnterpriseContext ctx) throws
RemoteException;
/**
* Activates the given EnterpriseContext
*/
protected abstract void activate(EnterpriseContext ctx) throws RemoteException;
/**
* Acquires an EnterpriseContext from the pool
*/
protected abstract EnterpriseContext acquireContext() throws Exception;
/**
* Frees the given EnterpriseContext to the pool
*/
protected abstract void freeContext(EnterpriseContext ctx);
/**
* Returns the key used by the cache to map the given context
*/
protected abstract Object getKey(EnterpriseContext ctx);
/**
* Sets the given id as key for the given context
*/
protected abstract void setKey(Object id, EnterpriseContext ctx);
/**
* Returns whether the given context can be passivated or not
*/
protected abstract boolean canPassivate(EnterpriseContext ctx);
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
/**
* Helper class that schedules, unschedules, and executes the passivation jobs.
*/
protected class PassivationHelper
{
/* The map that holds the passivation jobs posted */
private Map m_passivationJobs;
protected PassivationHelper()
{
m_passivationJobs = Collections.synchronizedMap(new HashMap());
}
/**
* Creates and schedules a {@link PassivationJob} for passivation
*/
protected void schedule(EnterpriseContext bean)
{
// Register only once the job to be able to unschedule its
passivation
Object key = getKey(bean);
if (m_passivationJobs.get(key) == null)
{
// Create the passivation job
PassivationJob job = new PassivationJob(bean)
{
private StringBuffer m_buffer = new
StringBuffer();
public void execute() throws Exception
{
EnterpriseContext ctx =
getEnterpriseContext();
Object id = getKey(ctx);
if (id == null)
{
// Here is a bug. Multiple
passivation requests for the
// same bean must be scheduled
only once.
throw new
IllegalStateException("Trying to passivate an already passivated bean");
}
/**
* Synchronization / Passivation
explanations:
* The instance interceptor (II) first
acquires the Sync object associated
* with the given id, then asks to the
instance cache (IC) for an enterprise
* context.
* The IC, if the context is not
present, activates it and returns it.
* If the context is not in the IC
then or it has already been
* passivated, or it is scheduled for
passivation. If the latter is true,
* then the job is canceled and
passivation does not occur, and the context
* is reinserted in the IC.
* This activity synchronizes in the
following order the following objects:
* 1) the Sync object associated to
the context's id via getLock(id)
* 2) the cache lock object via
getCacheLock()
* 3) the passivation job, since
cancel() is synchronized.
* To avoid deadlock, here we MUST
acquire these resources in the same
* exact order.
*/
Sync mutex = (Sync)getLock(id);
try
{
mutex.acquire();
synchronized (getCacheLock())
{
// This is absolutely
fundamental: the job must be removed from
// the map in every
case. If it remains there, the call to
//
PassivationHelper.unschedule() will cause the corrispondent
// context to be
reinserted in the cache, and if then is passivated
// we have a context
without meaning in the cache
m_passivationJobs.remove(id);
synchronized (this)
{
if
(!canPassivate(ctx))
{
//
This check is done because there could have been
// a
request for passivation of this bean, but before
//
being passivated it got a request and has already
//
been inserted in the cache by the cache
if
(getCache().peek(id) == null)
{
getCache().insert(id, ctx);
}
//
Logger is very time expensive.
log("Postponed passivation of bean ", id);
return;
}
else
{
if
(!isCancelled())
{
try
{
// If the next call throws RemoteException we can
// remove the context from the cache
passivate(ctx);
executed();
removeLock(id);
freeContext(ctx);
// Logger is very time expensive.
log("Passivated bean ", id);
}
catch (RemoteException x)
{
// Can't passivate this bean, remove it
// EJB 1.1, 6.4.1
getCache().remove(id);
throw x;
}
}
}
}
}
}
catch (InterruptedException ignored) {}
finally
{
mutex.release();
}
}
private void log(String msg, Object id)
{
m_buffer.setLength(0);
m_buffer.append(msg);
m_buffer.append(getContainer().getBeanMetaData().getEjbName());
m_buffer.append(" with id = ");
m_buffer.append(id);
Logger.debug(m_buffer.toString());
}
};
// Register job
m_passivationJobs.put(key, job);
// Schedule the job for passivation
m_passivator.putJob(job);
}
}
/**
* Tries to unschedule a job paired with the given context's id
* @return null if the bean has been passivated, the context
* paired with the given id otherwise
*/
protected EnterpriseContext unschedule(Object id)
{
// I chose not to remove canceled job here because multiple
// unscheduling requests can arrive. This way all will be
served
// Is the passivation job for id still to be executed ?
PassivationJob job = (PassivationJob)m_passivationJobs.get(id);
if (job != null)
{
// Still to execute or executing now, cancel the job
job.cancel();
// Sync to not allow method execute to be executed
after
// the if statement below but before the return
synchronized (job)
{
if (!job.isExecuted())
{
// Still to be executed, return the
bean
return job.getEnterpriseContext();
}
}
}
// Unscheduling request arrived too late, bean already
passivated
return null;
}
}
}
/**
* Abstract class for passivation jobs.
* Subclasses should implement {@link #execute} synchronizing it in some way because
* the execute method is normally called in the passivation thread,
* while the cancel method is normally called from another thread.
* To avoid that subclasses override methods of this class without
* make them synchronized (except execute of course), they're declared final.
*/
abstract class PassivationJob implements Executable
{
private EnterpriseContext m_context;
private boolean m_cancelled;
private boolean m_executed;
PassivationJob(EnterpriseContext ctx)
{
m_context = ctx;
}
public abstract void execute() throws Exception;
/**
* Returns the EnterpriseContext associated with this passivation job,
* so the bean that will be passivated.
* No need to synchronize access to this method, since the returned
* reference is immutable
*/
final EnterpriseContext getEnterpriseContext()
{
return m_context;
}
/**
* Mark this job for cancellation.
* @see #isCancelled
*/
final synchronized void cancel()
{
m_cancelled = true;
}
/**
* Returns whether this job has been marked for cancellation
* @see #cancel
*/
final synchronized boolean isCancelled()
{
return m_cancelled;
}
/**
* Mark this job as executed
* @see #isExecuted
*/
final synchronized void executed()
{
m_executed = true;
}
/**
* Returns whether this job has been executed
* @see #executed
*/
final synchronized boolean isExecuted()
{
return m_executed;
}
}
class PassivatorQueue extends WorkerQueue
{
/**
* Used for debug purposes, holds the scheduled passivation jobs
*/
// private Map m_map = new HashMap();
/**
* Creates a new passivator queue with default thread name of
* "Passivator Thread".
*/
PassivatorQueue()
{
this("Passivator Thread", null);
}
/**
* Creates a new passivator queue with the given thread name and given
* context class loader. <br>
* @param threadName the name of the passivator thread
* @param cl the context class loader; if null the context class loader is not
set.
*/
PassivatorQueue(String threadName, ClassLoader cl)
{
super(threadName);
if (cl != null)
{
m_queueThread.setContextClassLoader(cl);
}
}
/**
* Overridden for debug purposes
*//*
protected Executable getJobImpl() throws InterruptedException
{
PassivationJob j = (PassivationJob)super.getJobImpl();
EnterpriseContext ctx = j.getEnterpriseContext();
Object id = ctx.getId();
m_map.remove(id);
return j;
}
*/
/**
* Overridden for debug purposes
*//*
protected void putJobImpl(Executable job)
{
PassivationJob j = (PassivationJob)job;
EnterpriseContext ctx = j.getEnterpriseContext();
Object id = ctx.getId();
if (m_map.get(id) != null)
{
// Here is a bug, job requests are scheduled only once per
bean.
System.err.println("DUPLICATE PASSIVATION JOB INSERTION FOR ID
= " + ctx.getId());
System.err.println("CTX isLocked: " + ctx.isLocked());
System.err.println("CTX transaction: " + ctx.getTransaction());
throw new IllegalStateException();
}
else
{
m_map.put(id, job);
}
super.putJobImpl(job);
}
*/
/**
* Logs exceptions thrown during job execution.
*/
protected void logJobException(Exception x)
{
// Log system exceptions
if (x instanceof EJBException)
{
Logger.error("BEAN EXCEPTION:"+x.getMessage());
Exception nestedX = ((EJBException)x).getCausedByException();
if (nestedX != null)
{
Logger.exception(nestedX);
}
} else {
Logger.exception(x);
}
}
}