Hey
> <THIS IS CRAZY>
<snip>
Hm.. yeah, that was new to me.. not sure yet if it's counterintuitive or
not...
aanyway, let's do it another way then :-) Simply replace the mutex with a
semaphore. I have done this now, but due to my limited dev environment at
the moment I can't test it (not even compile it). I have attached it below,
so if it looks ok to you Sebastien can put it in and test it with the stress
tests.
The basic idea is to use semaphore primitives instead of a code section
mutex. This allows the synchronized blocks to be short instead of long (i.e.
synchronization is only in the semaphore primitives), and it is easier to
interleave with synchronized blocks. See code for details.
The lock and unlock primitives are at the end. These should be moved to the
cache so that they can be used by the passivation thread as well.
Seems ok? Have I missed something in this new design/code?
/Rickard
------------------
/*
* jBoss, the OpenSource EJB server
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jboss.ejb.plugins;
import java.lang.reflect.Method;
import java.rmi.RemoteException;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import javax.ejb.EJBObject;
import javax.ejb.CreateException;
import javax.ejb.EJBException;
import javax.ejb.NoSuchEntityException;
import javax.ejb.RemoveException;
import javax.ejb.EntityBean;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import org.jboss.ejb.Container;
import org.jboss.ejb.EntityContainer;
import org.jboss.ejb.EntityPersistenceManager;
import org.jboss.ejb.EntityEnterpriseContext;
import org.jboss.ejb.EnterpriseContext;
import org.jboss.ejb.InstanceCache;
import org.jboss.ejb.InstancePool;
import org.jboss.ejb.MethodInvocation;
import org.jboss.ejb.CacheKey;
import org.jboss.metadata.EntityMetaData;
import org.jboss.logging.Logger;
/**
* This container acquires the given instance.
*
* @see <related>
* @author Rickard �berg ([EMAIL PROTECTED])
* @author <a href="[EMAIL PROTECTED]">Marc Fleury</a>
* @version $Revision: 1.20 $
*/
public class EntityInstanceInterceptor
extends AbstractInterceptor
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
protected EntityContainer container;
protected boolean cacheLocked = false; // Cache lock semaphore
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
public void setContainer(Container container)
this.container = (EntityContainer)container;
}
public Container getContainer()
{
return container;
}
// Interceptor implementation --------------------------------------
public Object invokeHome(MethodInvocation mi)
throws Exception
{
// Get context
EnterpriseContext ctx =
((EntityContainer)getContainer()).getInstancePool().get();
mi.setEnterpriseContext(ctx);
// It is a new context for sure so we can lock it
ctx.lock();
try
{
// Invoke through interceptors
return getNext().invokeHome(mi);
} finally
{
// Always unlock, no matter what
ctx.unlock();
// Still free? Not free if create() was called successfully
if (ctx.getId() == null)
{
container.getInstancePool().free(ctx);
}
else
{
// DEBUG Logger.debug("Entity was created; not returned to
pool");
synchronized (ctx)
{
file://Let the waiters know
ctx.notifyAll();
}
}
}
}
public Object invoke(MethodInvocation mi)
throws Exception
{
// The id store is2 a CacheKey in the case of Entity
CacheKey key = (CacheKey) mi.getId();
// Get cache
EnterpriseInstanceCache cache =
(EnterpriseInstanceCache)container.getInstanceCache();
Object mutex = cache.getLock(key);
EnterpriseContext ctx = null;
// We synchronize the locking logic (so that the invoke is unsynchronized
and can be reentrant)
// Lock cache usage section with the cache semaphore
lock();
try
{
do
{
// Get context
ctx = cache.get(key);
// Do we have a running transaction with the context
if (ctx.getTransaction() != null &&
// And are we trying to enter with another transaction, or no
transaction
(mi.getTransaction() == null ||
!ctx.getTransaction().equals(mi.getTransaction())))
{
// Let's put the thread to sleep a lock release will wake the thread
// Possible deadlock
Transaction tx = ctx.getTransaction();
Logger.log("LOCKING-WAITING (TRANSACTION) for id "+ctx.getId()+"
ctx.hash "+ctx.hashCode()+" tx:"+((tx == null) ? "null" : tx.toString()));
synchronized (ctx)
{
// Unlock cache semaphore
unlock();
try
{ctx.wait(100);
}
catch (InterruptedException ie)
{
}
}
// Lock cache semaphore again
lock();
// Try your luck again
ctx = null;
} else
{
// If we get here it's the right tx, or no tx
if (!ctx.isLocked())
{
file://take it!
ctx.lock();
}
else
{
if (!isCallAllowed(mi))
{
// Go to sleep and wait for the lock to be released
// This is not one of the "home calls" so we need to wait for the
lock
synchronized (ctx)
{
// Unlock cache semaphore
unlock();
// Possible deadlock
Logger.log("LOCKING-WAITING (CTX) for id "+ctx.getId()+" ctx.hash
"+ctx.hashCode());
try
{ctx.wait(100);
}
catch (InterruptedException ie)
{
}
}
// Lock cache semaphore
lock();
// Try your luck again
ctx = null;
// Not allowed reentrant call
file://throw new RemoteException("Reentrant call");
}
else
{
file://We are in a home call so take the lock, take it!
ctx.lock();
}
}
}
} while (ctx == null);
} finally
{
// Unlock cache semaphore
unlock();
}
// Set context on the method invocation
mi.setEnterpriseContext(ctx);
try
{
// Go on, you won
return getNext().invoke(mi);
}
catch (RemoteException e)
{
// Discard instance
// EJB 1.1 spec 12.3.1
cache.remove(key);
throw e;
} catch (RuntimeException e)
{
// Discard instance
// EJB 1.1 spec 12.3.1
cache.remove(key);
throw e;
} catch (Error e)
{
// Discard instance
// EJB 1.1 spec 12.3.1
cache.remove(key);
throw e;
} finally
{
// Logger.debug("Release instance for "+id);
if (ctx != null)
{
// Lock cache
lock();
try
{
// unlock the context
ctx.unlock();
if (ctx.getId() == null)
{
// Work only if no transaction was encapsulating this remove()
if (ctx.getTransaction() == null)
{
// Remove from cache
cache.remove(key);
// It has been removed -> send to the pool
container.getInstancePool().free(ctx);
}
}
// notify the thread waiting on ctx
synchronized (ctx)
{ ctx.notifyAll();
}
} finally
{
// Unlock cache semaphore
unlock();
}
}
}
}
// Private --------------------------------------------------------
private static Method getEJBHome;
private static Method getHandle;
private static Method getPrimaryKey;
private static Method isIdentical;
private static Method remove;
static
{
try
{
Class[] noArg = new Class[0];
getEJBHome = EJBObject.class.getMethod("getEJBHome", noArg);
getHandle = EJBObject.class.getMethod("getHandle", noArg);
getPrimaryKey = EJBObject.class.getMethod("getPrimaryKey", noArg);
isIdentical = EJBObject.class.getMethod("isIdentical", new Class[]
{EJBObject.class
});
remove = EJBObject.class.getMethod("remove", noArg);
}
catch (Exception x)
{x.printStackTrace();
}
}
private boolean isCallAllowed(MethodInvocation mi)
{
boolean reentrant =
((EntityMetaData)container.getBeanMetaData()).isReentrant();
if (reentrant)
{
return true;
}
else
{
Method m = mi.getMethod();
if (m.equals(getEJBHome) ||
m.equals(getHandle) ||
m.equals(getPrimaryKey) ||
m.equals(isIdentical) ||
m.equals(remove))
{
return true;
}
}
return false;
}
// Semaphore to lock cache usage
// RO: This should be moved to cache to allow other users of
// cache (e.g. passivation thread) to use it as well
private synchronized void lock()
{
while (cacheLocked)
{
try
{
wait();
} catch (InterruptedException e)
{
// Ignore
}
}
cacheLocked = true;
}
// Semaphore to unlock cache usage
// RO: This should be moved to cache to allow other users of
// cache (e.g. passivation thread) to use it as well
private synchronized void unlock()
{
cacheLocked = false;
this.notify(); // Notify one thread that cache semaphore is now
unlocked
// RO: Does this have to be notifyAll()??
}
}