User: simone
Date: 00/10/16 16:29:06
Added: src/main/org/jboss/ejb/plugins EnterpriseInstanceCache.java
Log:
Base class for passivating caches of stateful and entity beans
Revision Changes Path
1.1
jboss/src/main/org/jboss/ejb/plugins/EnterpriseInstanceCache.java
Index: EnterpriseInstanceCache.java
===================================================================
/*
* jBoss, the OpenSource EJB server
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ejb.plugins;
import java.lang.reflect.Constructor;
import java.rmi.RemoteException;
import java.rmi.NoSuchObjectException;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import org.w3c.dom.Element;
import org.jboss.util.CachePolicy;
import org.jboss.util.Executable;
import org.jboss.util.WorkerQueue;
import org.jboss.ejb.InstanceCache;
import org.jboss.ejb.EnterpriseContext;
import org.jboss.ejb.DeploymentException;
import org.jboss.ejb.Container;
import org.jboss.metadata.MetaData;
import org.jboss.metadata.XmlLoadable;
/**
* Base class for caches of entity and stateful beans. <p>
* It manages the cache entries through a {@link CachePolicy} object;
* the implementation of the cache policy object must respect the following
* requirements:
* <ul>
* <li> Have a public constructor that takes a single argument of type
* EnterpriseInstanceCache.class or a subclass
* </ul>
*
* @author Simone Bordet ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public abstract class EnterpriseInstanceCache
implements InstanceCache, XmlLoadable
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
/* The object that is delegated to implement the desired caching policy */
private CachePolicy m_cache;
/* The worker queue that passivates beans in another thread */
private WorkerQueue m_passivator;
/* The mutex object for the cache */
private Object m_cacheLock = new Object();
/* Helper class that handles synchronization for the passivation thread */
private PassivationHelper m_passivationHelper;
/* Min cache capaciy */
private int m_minCapacity;
/* Max cache capaciy */
private int m_maxCapacity;
/* Map that holds mutexes used to sync the passivation with other activities */
private Map m_lockMap = new HashMap();
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
/* From InstanceCache interface */
public EnterpriseContext get(Object id)
throws RemoteException, NoSuchObjectException
{
if (id == null) throw new IllegalArgumentException("Can't get an
object with a null key");
EnterpriseContext ctx = null;
synchronized (getCacheLock())
{
ctx = (EnterpriseContext)getCache().get(id);
}
if (ctx == null)
{
// Here I block if the bean is passivating now
ctx = unschedulePassivation(id);
// Already passivated ?
if (ctx == null)
{
try
{
ctx = acquireContext();
setKey(id, ctx);
activate(ctx);
}
catch (Exception x)
{
freeContext(ctx);
throw new
NoSuchObjectException(x.getMessage());
}
}
insert(ctx);
}
if (ctx == null) throw new NoSuchObjectException("Can't find bean with
id = " + id);
return ctx;
}
/* From InstanceCache interface */
public void insert(EnterpriseContext ctx)
{
if (ctx == null) throw new IllegalArgumentException("Can't insert a
null object in the cache");
Object key = getKey(ctx);
synchronized (getCacheLock())
{
// Paranoid check...
if (getCache().peek(key) == null)
{
getCache().insert(key, ctx);
}
else
{
throw new IllegalStateException("Can't insert bean
with id = " + key + ": it is already in the cache.");
}
}
}
/* From InstanceCache interface */
public void release(EnterpriseContext ctx)
{
if (ctx == null) throw new IllegalArgumentException("Can't release a
null object");
remove(getKey(ctx));
schedulePassivation(ctx);
}
/* From InstanceCache interface */
public void remove(Object id)
{
if (id == null) throw new IllegalArgumentException("Can't remove an
object using a null key");
synchronized (getCacheLock())
{
// Paranoid check...
if (getCache().peek(id) != null)
{
getCache().remove(id);
}
else
{
throw new IllegalStateException("Can't remove bean
with id = " + id + ": it isn't in the cache.");
}
}
removeLock(id);
}
/**
* Creates (if necessary) and returns an object used as mutex to sync
passivation
* activity with other activities. <br>
* The mutex is automatically removed when the corrispondent id is removed from
* the cache.
*/
public synchronized Object getLock(Object id)
{
Object mutex = m_lockMap.get(id);
if (mutex == null)
{
mutex = new Object();
m_lockMap.put(id, mutex);
}
return mutex;
}
/**
* Removes the mutex associated with the given id.
*/
protected synchronized void removeLock(Object id)
{
Object mutex = m_lockMap.get(id);
if (mutex != null)
{
m_lockMap.remove(id);
}
}
/**
* Returns the minimum capacity of the cache.
*/
public int getMinCapacity() {return m_minCapacity;}
/**
* Returns the maximum capacity of the cache.
*/
public int getMaxCapacity() {return m_maxCapacity;}
// XmlLoadable implementation ----------------------------------------------
public void importXml(Element element) throws DeploymentException
{
String min =
MetaData.getElementContent(MetaData.getOptionalChild(element, "MinimumCapacity"));
String max =
MetaData.getElementContent(MetaData.getOptionalChild(element, "MaximumCapacity"));
try
{
if (min != null)
{
int s = Integer.parseInt(min);
if (s <= 0)
{
throw new DeploymentException("Min cache
capacity can't be <= 0");
}
m_minCapacity = s;
}
if (max != null)
{
int s = Integer.parseInt(max);
if (s <= 0)
{
throw new DeploymentException("Max cache
capacity can't be <= 0");
}
m_maxCapacity = s;
}
}
catch (NumberFormatException x)
{
throw new DeploymentException("Can't parse cache
configuration", x);
}
// This one is mandatory
String p = MetaData.getElementContent(MetaData.getUniqueChild(element,
"CachePolicy"));
try
{
Class cls =
Thread.currentThread().getContextClassLoader().loadClass(p);
Constructor ctor = cls.getConstructor(new Class[]
{EnterpriseInstanceCache.class});
m_cache = (CachePolicy)ctor.newInstance(new Object[] {this});
}
catch (Exception x)
{
throw new DeploymentException("Can't create cache policy", x);
}
Element policyConf = MetaData.getOptionalChild(element,
"cache-policy-conf");
if (policyConf != null)
{
if (m_cache instanceof XmlLoadable)
{
try
{
((XmlLoadable)m_cache).importXml(policyConf);
}
catch (Exception x)
{
throw new DeploymentException("Can't import
policy configuration", x);
}
}
}
}
/* From Service interface*/
public void init() throws Exception
{
getCache().init();
m_passivationHelper = new PassivationHelper();
}
/* From Service interface*/
public void start() throws Exception
{
getCache().start();
m_passivator = new WorkerQueue("Passivator Thread for " +
getContainer().getBeanMetaData().getEjbName());
m_passivator.start();
}
/* From Service interface*/
public void stop()
{
// Empty the cache
synchronized (getCacheLock())
{
getCache().stop();
}
m_passivator.stop();
}
/* From Service interface*/
public void destroy()
{
getCache().destroy();
}
// Y overrides ---------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
/**
* Schedules the given EnterpriseContext for passivation
* @see PassivationHelper#schedule
*/
protected void schedulePassivation(EnterpriseContext ctx)
{
m_passivationHelper.schedule(ctx);
}
/**
* Tries to unschedule the given EnterpriseContext for passivation; returns
* the unscheduled context if it wasn't passivated yet, null if the
* passivation already happened.
* @see PassivationHelper#unschedule
*/
protected EnterpriseContext unschedulePassivation(Object id)
{
return m_passivationHelper.unschedule(id);
}
/**
* Returns the container for this cache.
*/
protected abstract Container getContainer();
/**
* Returns the cache policy used for this cache.
*/
protected CachePolicy getCache() {return m_cache;}
/**
* Returns the mutex used to sync access to the cache policy object
*/
protected Object getCacheLock()
{
return m_cacheLock;
}
/**
* Passivates the given EnterpriseContext
*/
protected abstract void passivate(EnterpriseContext ctx) throws
RemoteException;
/**
* Activates the given EnterpriseContext
*/
protected abstract void activate(EnterpriseContext ctx) throws RemoteException;
/**
* Acquires an EnterpriseContext from the pool
*/
protected abstract EnterpriseContext acquireContext() throws Exception;
/**
* Frees the given EnterpriseContext to the pool
*/
protected abstract void freeContext(EnterpriseContext ctx);
/**
* Returns the key used by the cache to map the given context
*/
protected abstract Object getKey(EnterpriseContext ctx);
/**
* Sets the given id as key for the given context
*/
protected abstract void setKey(Object id, EnterpriseContext ctx);
/**
* Returns whether the given context can be passivated or not
*/
protected abstract boolean canPassivate(EnterpriseContext ctx);
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
/**
* Helper class that schedules, unschedules, and executes the passivation jobs.
*/
protected class PassivationHelper
{
/* The map that holds the passivation jobs posted */
private Map m_passivationJobs = Collections.synchronizedMap(new
HashMap());
/**
* Creates and schedules a {@link PassivationJob} for passivation
*/
protected void schedule(EnterpriseContext bean)
{
PassivationJob job = new PassivationJob(bean)
{
public synchronized void execute() throws Exception
{
EnterpriseContext ctx = getEnterpriseContext();
Object id = getKey(ctx);
Object mutex = getLock(id);
synchronized (mutex)
{
if (!canPassivate(ctx))
{
synchronized (getCacheLock())
{
getCache().insert(id,
ctx);
}
return;
}
else
{
if (!isCancelled())
{
try
{
// If the next
call throws RemoteException we can
// reinsert
the context meaningfully in the cache
passivate(ctx);
executed();
freeContext(ctx);
}
catch (RemoteException
x)
{
// Can't
passivate this bean, keep it in memory
// Reinsert it
in the cache
synchronized
(getCacheLock())
{
getCache().insert(id, ctx);
}
throw x;
}
finally
{
m_passivationJobs.remove(id);
}
}
}
}
}
};
m_passivationJobs.put(getKey(bean), job);
m_passivator.putJob(job);
}
/**
* Tries to unschedule a job paired with the given context's id
* @return null if the bean has been passivated, the context
* paired with the given id otherwise
*/
protected EnterpriseContext unschedule(Object id)
{
PassivationJob job = (PassivationJob)m_passivationJobs.get(id);
if (job != null)
{
// Still to execute or executing now, cancel the job
job.cancel();
// Sync to not allow method execute to be executed
after
// the if statement below but before the return
synchronized (job)
{
if (!job.isExecuted())
{
// Still to be executed, return the
bean
return job.getEnterpriseContext();
}
}
}
// Unscheduling request arrived too late, bean already
passivated
return null;
}
}
}
/**
* Abstract class for passivation jobs.
* Subclasses should implement {@link #execute} synchronizing it in some way because
* the execute method is normally called in the passivation thread,
* while the cancel method is normally called from another thread.
* To avoid that subclasses override methods of this class without
* make them synchronized (except execute of course), they're declared final.
*/
abstract class PassivationJob implements Executable
{
private EnterpriseContext m_context;
private boolean m_cancelled;
private boolean m_executed;
PassivationJob(EnterpriseContext ctx)
{
m_context = ctx;
}
public abstract void execute() throws Exception;
/**
* Returns the EnterpriseContext associated with this passivation job,
* so the bean that will be passivated.
*/
final synchronized EnterpriseContext getEnterpriseContext()
{
return m_context;
}
/**
* Mark this job for cancellation.
* @see #isCancelled
*/
final synchronized void cancel()
{
m_cancelled = true;
}
/**
* Returns whether this job has been marked for cancellation
* @see #cancel
*/
final synchronized boolean isCancelled()
{
return m_cancelled;
}
/**
* Mark this job as executed
* @see #isExecuted
*/
final synchronized void executed()
{
m_executed = true;
}
/**
* Returns whether this job has been executed
* @see #executed
*/
final synchronized boolean isExecuted()
{
return m_executed;
}
}