djencks 2004/02/23 12:28:43
Modified: modules/connector/src/java/org/apache/geronimo/connector/work GeronimoWorkManager.java WorkerContext.java modules/connector/src/test/org/apache/geronimo/connector/work PooledWorkManagerTest.java modules/transaction/src/java/org/apache/geronimo/transaction TransactionManagerProxy.java TransactionProxy.java modules/transaction/src/java/org/apache/geronimo/transaction/manager TransactionImpl.java TransactionManagerImpl.java XidFactory.java XidImpl.java Added: modules/transaction/src/java/org/apache/geronimo/transaction XAWork.java modules/transaction/src/java/org/apache/geronimo/transaction/manager XidImporter.java Log: add remote tx importing support. no tests yet. Revision Changes Path 1.2 +17 -29 incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/GeronimoWorkManager.java Index: GeronimoWorkManager.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/GeronimoWorkManager.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- GeronimoWorkManager.java 23 Jan 2004 05:56:11 -0000 1.1 +++ GeronimoWorkManager.java 23 Feb 2004 20:28:42 -0000 1.2 @@ -66,11 +66,10 @@ import org.apache.geronimo.connector.work.pool.StartWorkExecutorPool; import org.apache.geronimo.connector.work.pool.SyncWorkExecutorPool; import org.apache.geronimo.connector.work.pool.WorkExecutorPool; -import org.apache.geronimo.gbean.GAttributeInfo; import org.apache.geronimo.gbean.GBeanInfo; import org.apache.geronimo.gbean.GBeanInfoFactory; import org.apache.geronimo.gbean.GConstructorInfo; -import org.apache.geronimo.gbean.GOperationInfo; +import org.apache.geronimo.transaction.XAWork; /** * WorkManager implementation which uses under the cover three WorkExecutorPool @@ -90,8 +89,6 @@ private final static int DEFAULT_MIN_POOL_SIZE = 0; private final static int DEFAULT_MAX_POOL_SIZE = 10; - private static final GBeanInfo GBEAN_INFO; - /** * Pool of threads used by this WorkManager in order to process * the Work instances submitted via the doWork methods. @@ -110,21 +107,24 @@ */ private final WorkExecutorPool scheduledWorkExecutorPool; + private final XAWork xaWork; + /** * Create a WorkManager. */ public GeronimoWorkManager() { - this(DEFAULT_MIN_POOL_SIZE, DEFAULT_MAX_POOL_SIZE); + this(DEFAULT_MIN_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, null); } - public GeronimoWorkManager(int minSize, int maxSize) { - this(minSize, maxSize, minSize, maxSize, minSize, maxSize); + public GeronimoWorkManager(int minSize, int maxSize, XAWork xaWork) { + this(minSize, maxSize, minSize, maxSize, minSize, maxSize, xaWork); } - public GeronimoWorkManager(int syncMinSize, int syncMaxSize, int startMinSize, int startMaxSize, int schedMinSize, int schedMaxSize) { + public GeronimoWorkManager(int syncMinSize, int syncMaxSize, int startMinSize, int startMaxSize, int schedMinSize, int schedMaxSize, XAWork xaWork) { syncWorkExecutorPool = new SyncWorkExecutorPool(syncMinSize, syncMaxSize); startWorkExecutorPool = new StartWorkExecutorPool(startMinSize, startMaxSize); scheduledWorkExecutorPool = new ScheduleWorkExecutorPool(schedMinSize, schedMaxSize); + this.xaWork = xaWork; } public int getSyncThreadCount() { @@ -204,7 +204,7 @@ WorkListener workListener) throws WorkException { WorkerContext workWrapper = - new WorkerContext(work, startTimeout, execContext, workListener); + new WorkerContext(work, startTimeout, execContext, xaWork, workListener); workWrapper.setThreadPriority(Thread.currentThread().getPriority()); syncWorkExecutorPool.executeWork(workWrapper); } @@ -229,7 +229,7 @@ WorkListener workListener) throws WorkException { WorkerContext workWrapper = - new WorkerContext(work, startTimeout, execContext, workListener); + new WorkerContext(work, startTimeout, execContext, xaWork, workListener); workWrapper.setThreadPriority(Thread.currentThread().getPriority()); startWorkExecutorPool.executeWork(workWrapper); return System.currentTimeMillis() - workWrapper.getAcceptedTime(); @@ -254,31 +254,19 @@ WorkListener workListener) throws WorkException { WorkerContext workWrapper = - new WorkerContext(work, startTimeout, execContext, workListener); + new WorkerContext(work, startTimeout, execContext, xaWork, workListener); workWrapper.setThreadPriority(Thread.currentThread().getPriority()); scheduledWorkExecutorPool.executeWork(workWrapper); } + public static final GBeanInfo GBEAN_INFO; + static { GBeanInfoFactory infoFactory = new GBeanInfoFactory(GeronimoWorkManager.class.getName()); - infoFactory.addAttribute(new GAttributeInfo("SyncThreadCount", true)); - infoFactory.addAttribute(new GAttributeInfo("SyncMinimumPoolSize", true)); - infoFactory.addAttribute(new GAttributeInfo("SyncMaximumPoolSize", true)); - infoFactory.addAttribute(new GAttributeInfo("StartThreadCount", true)); - infoFactory.addAttribute(new GAttributeInfo("StartMinimumPoolSize", true)); - infoFactory.addAttribute(new GAttributeInfo("StartMaximumPoolSize", true)); - infoFactory.addAttribute(new GAttributeInfo("ScheduledThreadCount", true)); - infoFactory.addAttribute(new GAttributeInfo("ScheduledMinimumPoolSize", true)); - infoFactory.addAttribute(new GAttributeInfo("ScheduledMaximumPoolSize", true)); - infoFactory.addOperation(new GOperationInfo("doWork", new String[]{Work.class.getName()})); - infoFactory.addOperation(new GOperationInfo("doWork", new String[]{Work.class.getName(), Long.TYPE.getName(), ExecutionContext.class.getName(), WorkListener.class.getName()})); - infoFactory.addOperation(new GOperationInfo("startWork", new String[]{Work.class.getName()})); - infoFactory.addOperation(new GOperationInfo("startWork", new String[]{Work.class.getName(), Long.TYPE.getName(), ExecutionContext.class.getName(), WorkListener.class.getName()})); - infoFactory.addOperation(new GOperationInfo("scheduleWork", new String[]{Work.class.getName()})); - infoFactory.addOperation(new GOperationInfo("scheduleWork", new String[]{Work.class.getName(), Long.TYPE.getName(), ExecutionContext.class.getName(), WorkListener.class.getName()})); + infoFactory.addInterface(WorkManager.class, new String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize", "StartMinimumPoolSize", "StartMaximumPoolSize", "ScheduledMinimumPoolSize", "ScheduledMaximumPoolSize", "XAWork"}); infoFactory.setConstructor(new GConstructorInfo( - new String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize", "StartMinimumPoolSize", "StartMaximumPoolSize", "ScheduledMinimumPoolSize", "ScheduledMaximumPoolSize"}, - new Class[]{Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE})); + new String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize", "StartMinimumPoolSize", "StartMaximumPoolSize", "ScheduledMinimumPoolSize", "ScheduledMaximumPoolSize", "XAWork"}, + new Class[]{Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, Integer.TYPE, XAWork.class})); GBEAN_INFO = infoFactory.getBeanInfo(); } 1.2 +22 -5 incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java Index: WorkerContext.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- WorkerContext.java 23 Jan 2004 05:56:11 -0000 1.1 +++ WorkerContext.java 23 Feb 2004 20:28:42 -0000 1.2 @@ -67,6 +67,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.geronimo.transaction.XAWork; import EDU.oswego.cs.dl.util.concurrent.Latch; @@ -118,7 +119,9 @@ /** * Execution context of the actual work to be executed. */ - private ExecutionContext executionContext; + private final ExecutionContext executionContext; + + private final XAWork xaWork; /** * Listener to be notified during the life-cycle of the work treatment. @@ -147,6 +150,8 @@ */ public WorkerContext(Work aWork) { adaptee = aWork; + executionContext = null; + xaWork = null; } /** @@ -162,11 +167,13 @@ * work completed) occur. */ public WorkerContext(Work aWork, long aStartTimeout, - ExecutionContext execContext, - WorkListener workListener) { + ExecutionContext execContext, + XAWork xaWork, + WorkListener workListener) { adaptee = aWork; startTimeOut = aStartTimeout; executionContext = execContext; + this.xaWork = xaWork; if (null != workListener) { this.workListener = workListener; } @@ -303,7 +310,17 @@ new WorkEvent(this, WorkEvent.WORK_STARTED, adaptee, null)); startLatch.release(); try { - adaptee.run(); + if (executionContext == null || executionContext.getXid() == null) { + adaptee.run(); + } else { + try { + xaWork.begin(executionContext.getXid(), executionContext.getTransactionTimeout()); + adaptee.run(); + } finally { + xaWork.end(executionContext.getXid()); + } + + } workListener.workCompleted( new WorkEvent(this, WorkEvent.WORK_COMPLETED, adaptee, null)); } catch (Throwable e) { 1.2 +2 -2 incubator-geronimo/modules/connector/src/test/org/apache/geronimo/connector/work/PooledWorkManagerTest.java Index: PooledWorkManagerTest.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/connector/src/test/org/apache/geronimo/connector/work/PooledWorkManagerTest.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- PooledWorkManagerTest.java 23 Jan 2004 05:56:11 -0000 1.1 +++ PooledWorkManagerTest.java 23 Feb 2004 20:28:43 -0000 1.2 @@ -84,7 +84,7 @@ private static final int m_tempo = 200; protected void setUp() throws Exception { - m_workManager = new GeronimoWorkManager(1, 1); + m_workManager = new GeronimoWorkManager(1, 1, null); } public void testDoWork() throws Exception { 1.3 +98 -15 incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionManagerProxy.java Index: TransactionManagerProxy.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionManagerProxy.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- TransactionManagerProxy.java 30 Jan 2004 01:32:00 -0000 1.2 +++ TransactionManagerProxy.java 23 Feb 2004 20:28:43 -0000 1.3 @@ -55,6 +55,11 @@ */ package org.apache.geronimo.transaction; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; + import javax.resource.spi.XATerminator; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; @@ -74,6 +79,7 @@ import org.apache.geronimo.gbean.GConstructorInfo; import org.apache.geronimo.gbean.GAttributeInfo; import org.apache.geronimo.transaction.manager.TransactionManagerImpl; +import org.apache.geronimo.transaction.manager.XidImporter; /** * A wrapper for a TransactionManager that wraps all Transactions in a TransactionProxy @@ -83,19 +89,23 @@ * * @version $Revision$ $Date$ */ -public class TransactionManagerProxy implements TransactionManager, XATerminator { +public class TransactionManagerProxy implements TransactionManager, XATerminator, XAWork { public static final GBeanInfo GBEAN_INFO; private final TransactionManager delegate; + private final XidImporter importer; private final ThreadLocal threadTx = new ThreadLocal(); + private final Map importedTransactions = new HashMap(); + private Set activeTransactions = new HashSet(); /** * Constructor taking the TransactionManager to wrap. * @param delegate the TransactionManager that should be wrapped */ - public TransactionManagerProxy(TransactionManager delegate) { + public TransactionManagerProxy(TransactionManager delegate, XidImporter importer) { this.delegate = delegate; + this.importer = importer; } /** @@ -103,6 +113,7 @@ */ public TransactionManagerProxy() { this.delegate = new TransactionManagerImpl(); + this.importer = null; } public void setTransactionTimeout(int timeout) throws SystemException { @@ -174,22 +185,55 @@ /** * @see javax.resource.spi.XATerminator#commit(javax.transaction.xa.Xid, boolean) */ - public void commit(Xid arg0, boolean arg1) throws XAException { - throw new XAException("Not implemented."); + public void commit(Xid xid, boolean onePhase) throws XAException { + TransactionProxy tx = (TransactionProxy) importedTransactions.remove(xid); + if (tx == null) { + throw new XAException("No imported transaction for xid: " + xid); + } + + try { + int status = tx.getStatus(); + assert status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARED; + } catch (SystemException e) { + throw new XAException(); + } + importer.commit(tx.getDelegate(), onePhase); } /** * @see javax.resource.spi.XATerminator#forget(javax.transaction.xa.Xid) */ - public void forget(Xid arg0) throws XAException { - throw new XAException("Not implemented."); + public void forget(Xid xid) throws XAException { + TransactionProxy tx = (TransactionProxy) importedTransactions.remove(xid); + if (tx == null) { + throw new XAException("No imported transaction for xid: " + xid); + } + + try { + int status = tx.getStatus(); + //assert status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARED; + } catch (SystemException e) { + throw new XAException(); + } + importer.forget(tx.getDelegate()); } /** * @see javax.resource.spi.XATerminator#prepare(javax.transaction.xa.Xid) */ - public int prepare(Xid arg0) throws XAException { - throw new XAException("Not implemented."); + public int prepare(Xid xid) throws XAException { + TransactionProxy tx = (TransactionProxy) importedTransactions.get(xid); + if (tx == null) { + throw new XAException("No imported transaction for xid: " + xid); + } + + try { + int status = tx.getStatus(); + assert status == Status.STATUS_ACTIVE; + } catch (SystemException e) { + throw new XAException(); + } + return importer.prepare(tx.getDelegate()); } /** @@ -202,8 +246,47 @@ /** * @see javax.resource.spi.XATerminator#rollback(javax.transaction.xa.Xid) */ - public void rollback(Xid arg0) throws XAException { - throw new XAException("Not implemented."); + public void rollback(Xid xid) throws XAException { + TransactionProxy tx = (TransactionProxy) importedTransactions.remove(xid); + if (tx == null) { + throw new XAException("No imported transaction for xid: " + xid); + } + + try { + int status = tx.getStatus(); + assert status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARED; + } catch (SystemException e) { + throw new XAException(); + } + importer.rollback(tx.getDelegate()); + } + + public void begin(Xid xid, long txTimeoutMillis) throws XAException { + TransactionProxy tx = (TransactionProxy) importedTransactions.get(xid); + if (tx == null) { + try { + tx = new TransactionProxy(importer.importXid(xid)); + } catch (SystemException e) { + throw (XAException)new XAException("Could not import xid").initCause(e); + } + importedTransactions.put(xid, tx); + } + if (activeTransactions.contains(tx)) { + throw new XAException("Xid already active"); + } + activeTransactions.add(tx); + threadTx.set(tx); + importer.setTransactionTimeout(txTimeoutMillis); + } + + public void end(Xid xid) throws XAException { + TransactionProxy tx = (TransactionProxy) importedTransactions.get(xid); + if (tx == null) { + throw new XAException("No imported transaction for xid: " + xid); + } + if (!activeTransactions.remove(tx)) { + throw new XAException("tx not active for xid: " + xid); + } } //for now we use the default constructor. @@ -211,17 +294,17 @@ GBeanInfoFactory infoFactory = new GBeanInfoFactory(TransactionManagerProxy.class.getName()); infoFactory.setConstructor(new GConstructorInfo( - new String[] { "Delegate" }, - new Class[] { TransactionManager.class })); + new String[]{"Delegate"}, + new Class[]{TransactionManager.class})); infoFactory.addAttribute(new GAttributeInfo("Delegate", true)); - infoFactory.addOperation(new GOperationInfo("setTransactionTimeout", new String[] {Integer.TYPE.getName()})); + infoFactory.addOperation(new GOperationInfo("setTransactionTimeout", new String[]{Integer.TYPE.getName()})); infoFactory.addOperation(new GOperationInfo("begin")); infoFactory.addOperation(new GOperationInfo("getStatus")); infoFactory.addOperation(new GOperationInfo("getTransaction")); infoFactory.addOperation(new GOperationInfo("suspend")); - infoFactory.addOperation(new GOperationInfo("resume", new String[] {Transaction.class.getName()})); + infoFactory.addOperation(new GOperationInfo("resume", new String[]{Transaction.class.getName()})); infoFactory.addOperation(new GOperationInfo("commit")); infoFactory.addOperation(new GOperationInfo("rollback")); infoFactory.addOperation(new GOperationInfo("setRollbackOnly")); 1.2 +5 -1 incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionProxy.java Index: TransactionProxy.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionProxy.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- TransactionProxy.java 23 Jan 2004 18:54:15 -0000 1.1 +++ TransactionProxy.java 23 Feb 2004 20:28:43 -0000 1.2 @@ -105,4 +105,8 @@ public void setRollbackOnly() throws IllegalStateException, SystemException { delegate.setRollbackOnly(); } + + Transaction getDelegate() { + return delegate; + } } 1.1 incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/XAWork.java Index: XAWork.java =================================================================== package org.apache.geronimo.transaction; import javax.transaction.xa.Xid; import javax.transaction.xa.XAException; /** * primarily an interface between the WorkManager/ExecutionContext and the tm. * * @version $Revision: 1.1 $ $Date: 2004/02/23 20:28:43 $ * * */ public interface XAWork { void begin(Xid xid, long txTimeout) throws XAException; void end(Xid xid) throws XAException; } 1.2 +204 -132 incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionImpl.java Index: TransactionImpl.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionImpl.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- TransactionImpl.java 23 Jan 2004 18:54:16 -0000 1.1 +++ TransactionImpl.java 23 Feb 2004 20:28:43 -0000 1.2 @@ -63,6 +63,7 @@ import java.util.List; import java.util.Map; import java.util.Set; + import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.RollbackException; @@ -94,9 +95,13 @@ private Map xaResources = new HashMap(3); TransactionImpl(XidFactory xidFactory, TransactionLog txnLog) throws SystemException { + this(xidFactory.createXid(), xidFactory, txnLog); + } + + TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog) throws SystemException { this.xidFactory = xidFactory; this.txnLog = txnLog; - this.xid = xidFactory.createXid(); + this.xid = xid; try { txnLog.begin(xid); } catch (IOException e) { @@ -114,16 +119,16 @@ public synchronized void setRollbackOnly() throws IllegalStateException, SystemException { switch (status) { - case Status.STATUS_ACTIVE: - case Status.STATUS_PREPARING: - status = Status.STATUS_MARKED_ROLLBACK; - break; - case Status.STATUS_MARKED_ROLLBACK: - case Status.STATUS_ROLLING_BACK: - // nothing to do - break; - default: - throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status)); + case Status.STATUS_ACTIVE: + case Status.STATUS_PREPARING: + status = Status.STATUS_MARKED_ROLLBACK; + break; + case Status.STATUS_MARKED_ROLLBACK: + case Status.STATUS_ROLLING_BACK: + // nothing to do + break; + default: + throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status)); } } @@ -132,13 +137,13 @@ throw new IllegalArgumentException("Synchronization is null"); } switch (status) { - case Status.STATUS_ACTIVE: - case Status.STATUS_PREPARING: - break; - case Status.STATUS_MARKED_ROLLBACK: - throw new RollbackException("Transaction is marked for rollback"); - default: - throw new IllegalStateException("Status is " + getStateString(status)); + case Status.STATUS_ACTIVE: + case Status.STATUS_PREPARING: + break; + case Status.STATUS_MARKED_ROLLBACK: + throw new RollbackException("Transaction is marked for rollback"); + default: + throw new IllegalStateException("Status is " + getStateString(status)); } syncList.add(synch); } @@ -148,12 +153,12 @@ throw new IllegalArgumentException("XAResource is null"); } switch (status) { - case Status.STATUS_ACTIVE: - break; - case Status.STATUS_MARKED_ROLLBACK: - throw new RollbackException("Transaction is marked for rollback"); - default: - throw new IllegalStateException("Status is " + getStateString(status)); + case Status.STATUS_ACTIVE: + break; + case Status.STATUS_MARKED_ROLLBACK: + throw new RollbackException("Transaction is marked for rollback"); + default: + throw new IllegalStateException("Status is " + getStateString(status)); } try { @@ -196,11 +201,11 @@ throw new IllegalArgumentException("XAResource is null"); } switch (status) { - case Status.STATUS_ACTIVE: - case Status.STATUS_MARKED_ROLLBACK: - break; - default: - throw new IllegalStateException("Status is " + getStateString(status)); + case Status.STATUS_ACTIVE: + case Status.STATUS_MARKED_ROLLBACK: + break; + default: + throw new IllegalStateException("Status is " + getStateString(status)); } ResourceManager manager = (ResourceManager) xaResources.remove(xaRes); if (manager == null) { @@ -215,27 +220,21 @@ } } + //Transaction method, does 2pc public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException { - synchronized (this) { - switch (status) { - case Status.STATUS_ACTIVE: - case Status.STATUS_MARKED_ROLLBACK: - break; - default: - throw new IllegalStateException("Status is " + getStateString(status)); - } - } + beforePrepare(); - beforeCompletion(); - endResources(); try { - LinkedList rms; + if (status == Status.STATUS_MARKED_ROLLBACK) { + rollbackResources(resourceManagers); + throw new RollbackException("Unable to commit"); + } synchronized (this) { if (status == Status.STATUS_ACTIVE) { - if (resourceManagers.size() == 0) { + if (this.resourceManagers.size() == 0) { // nothing to commit status = Status.STATUS_COMMITTED; - } else if (resourceManagers.size() == 1) { + } else if (this.resourceManagers.size() == 1) { // one-phase commit decision status = Status.STATUS_COMMITTING; } else { @@ -244,12 +243,11 @@ } } // resourceManagers is now immutable - rms = resourceManagers; } // one-phase - if (rms.size() == 1) { - ResourceManager manager = (ResourceManager) rms.getFirst(); + if (resourceManagers.size() == 1) { + ResourceManager manager = (ResourceManager) resourceManagers.getFirst(); try { manager.committer.commit(manager.branchId, true); synchronized (this) { @@ -267,74 +265,72 @@ } // two-phase - try { - txnLog.prepare(xid); - } catch (IOException e) { - try { - rollbackResources(rms); - } catch (Exception se) { - log.error("Unable to rollback after failure to log prepare", se.getCause()); - } - SystemException ex = new SystemException("Error logging prepare; transaction was rolled back)"); - ex.initCause(e); - throw ex; + boolean willCommit = internalPrepare(); + + // notify the RMs + if (willCommit) { + commitResources(resourceManagers); + } else { + rollbackResources(resourceManagers); + throw new RollbackException("Unable to commit"); } - for (Iterator i = rms.iterator(); i.hasNext();) { - synchronized (this) { - if (status != Status.STATUS_PREPARING) { - // we were marked for rollback - break; - } - } - ResourceManager manager = (ResourceManager) i.next(); - try { - int vote = manager.committer.prepare(manager.branchId); - if (vote == XAResource.XA_RDONLY) { - // we don't need to consider this RM any more - i.remove(); - } - } catch (XAException e) { - synchronized (this) { - status = Status.STATUS_MARKED_ROLLBACK; - } - } + } finally { + afterCompletion(); + synchronized (this) { + status = Status.STATUS_NO_TRANSACTION; } + } + } - // decision time... - boolean willCommit; + //Used from XATerminator for first phase in a remotely controlled tx. + int prepare() throws SystemException, RollbackException { + beforePrepare(); + int result = XAResource.XA_RDONLY; + try { + LinkedList rms; synchronized (this) { - willCommit = (status != Status.STATUS_MARKED_ROLLBACK); - if (willCommit) { - status = Status.STATUS_PREPARED; + if (status == Status.STATUS_ACTIVE) { + if (resourceManagers.size() == 0) { + // nothing to commit + status = Status.STATUS_COMMITTED; + return result; + } else { + // start prepare part of two-phase + status = Status.STATUS_PREPARING; + } } + // resourceManagers is now immutable + rms = resourceManagers; } - // log our decision - try { - if (willCommit) { - txnLog.commit(xid); - } else { - txnLog.rollback(xid); - } - } catch (IOException e) { - try { - rollbackResources(rms); - } catch (Exception se) { - log.error("Unable to rollback after failure to log decision", se.getCause()); - } - SystemException ex = new SystemException("Error logging decision (outcome is unknown)"); - ex.initCause(e); - throw ex; - } + boolean willCommit = internalPrepare(); // notify the RMs if (willCommit) { - commitResources(rms); + if (!rms.isEmpty()) { + result = XAResource.XA_OK; + } + } else { rollbackResources(rms); throw new RollbackException("Unable to commit"); } } finally { + if (result == XAResource.XA_RDONLY) { + afterCompletion(); + synchronized (this) { + status = Status.STATUS_NO_TRANSACTION; + } + } + } + return result; + } + + //used from XATerminator for commit phase of non-readonly remotely controlled tx. + void preparedCommit() throws SystemException { + try { + commitResources(resourceManagers); + } finally { afterCompletion(); synchronized (this) { status = Status.STATUS_NO_TRANSACTION; @@ -342,17 +338,94 @@ } } + //helper method used by Transaction.commit and XATerminator prepare. + private void beforePrepare() { + synchronized (this) { + switch (status) { + case Status.STATUS_ACTIVE: + case Status.STATUS_MARKED_ROLLBACK: + break; + default: + throw new IllegalStateException("Status is " + getStateString(status)); + } + } + + beforeCompletion(); + endResources(); + } + + + //helper method used by Transaction.commit and XATerminator prepare. + private boolean internalPrepare() throws SystemException { + try { + txnLog.prepare(xid); + } catch (IOException e) { + try { + rollbackResources(resourceManagers); + } catch (Exception se) { + log.error("Unable to rollback after failure to log prepare", se.getCause()); + } + throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e); + } + for (Iterator i = resourceManagers.iterator(); i.hasNext();) { + synchronized (this) { + if (status != Status.STATUS_PREPARING) { + // we were marked for rollback + break; + } + } + ResourceManager manager = (ResourceManager) i.next(); + try { + int vote = manager.committer.prepare(manager.branchId); + if (vote == XAResource.XA_RDONLY) { + // we don't need to consider this RM any more + i.remove(); + } + } catch (XAException e) { + synchronized (this) { + status = Status.STATUS_MARKED_ROLLBACK; + } + } + } + + // decision time... + boolean willCommit; + synchronized (this) { + willCommit = (status != Status.STATUS_MARKED_ROLLBACK); + if (willCommit) { + status = Status.STATUS_PREPARED; + } + } + + // log our decision + try { + if (willCommit) { + txnLog.commit(xid); + } else { + txnLog.rollback(xid); + } + } catch (IOException e) { + try { + rollbackResources(resourceManagers); + } catch (Exception se) { + log.error("Unable to rollback after failure to log decision", se.getCause()); + } + throw (SystemException) new SystemException("Error logging decision (outcome is unknown)").initCause(e); + } + return willCommit; + } + public void rollback() throws IllegalStateException, SystemException { List rms; synchronized (this) { switch (status) { - case Status.STATUS_ACTIVE: - status = Status.STATUS_MARKED_ROLLBACK; - break; - case Status.STATUS_MARKED_ROLLBACK: - break; - default: - throw new IllegalStateException("Status is " + getStateString(status)); + case Status.STATUS_ACTIVE: + status = Status.STATUS_MARKED_ROLLBACK; + break; + case Status.STATUS_MARKED_ROLLBACK: + break; + default: + throw new IllegalStateException("Status is " + getStateString(status)); } rms = resourceManagers; } @@ -368,9 +441,7 @@ } catch (Exception se) { log.error("Unable to rollback after failure to log decision", se.getCause()); } - SystemException ex = new SystemException("Error logging rollback"); - ex.initCause(e); - throw ex; + throw (SystemException) new SystemException("Error logging rollback").initCause(e); } rollbackResources(rms); } finally { @@ -494,28 +565,28 @@ private static String getStateString(int status) { switch (status) { - case Status.STATUS_ACTIVE: - return "STATUS_ACTIVE"; - case Status.STATUS_PREPARING: - return "STATUS_PREPARING"; - case Status.STATUS_PREPARED: - return "STATUS_PREPARED"; - case Status.STATUS_MARKED_ROLLBACK: - return "STATUS_MARKED_ROLLBACK"; - case Status.STATUS_ROLLING_BACK: - return "STATUS_ROLLING_BACK"; - case Status.STATUS_COMMITTING: - return "STATUS_COMMITTING"; - case Status.STATUS_COMMITTED: - return "STATUS_COMMITTED"; - case Status.STATUS_ROLLEDBACK: - return "STATUS_ROLLEDBACK"; - case Status.STATUS_NO_TRANSACTION: - return "STATUS_NO_TRANSACTION"; - case Status.STATUS_UNKNOWN: - return "STATUS_UNKNOWN"; - default: - throw new AssertionError(); + case Status.STATUS_ACTIVE: + return "STATUS_ACTIVE"; + case Status.STATUS_PREPARING: + return "STATUS_PREPARING"; + case Status.STATUS_PREPARED: + return "STATUS_PREPARED"; + case Status.STATUS_MARKED_ROLLBACK: + return "STATUS_MARKED_ROLLBACK"; + case Status.STATUS_ROLLING_BACK: + return "STATUS_ROLLING_BACK"; + case Status.STATUS_COMMITTING: + return "STATUS_COMMITTING"; + case Status.STATUS_COMMITTED: + return "STATUS_COMMITTED"; + case Status.STATUS_ROLLEDBACK: + return "STATUS_ROLLEDBACK"; + case Status.STATUS_NO_TRANSACTION: + return "STATUS_NO_TRANSACTION"; + case Status.STATUS_UNKNOWN: + return "STATUS_UNKNOWN"; + default: + throw new AssertionError(); } } @@ -527,6 +598,7 @@ return false; } } + private static class ResourceManager { private final XAResource committer; 1.2 +63 -2 incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionManagerImpl.java Index: TransactionManagerImpl.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionManagerImpl.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- TransactionManagerImpl.java 23 Jan 2004 18:54:16 -0000 1.1 +++ TransactionManagerImpl.java 23 Feb 2004 20:28:43 -0000 1.2 @@ -64,6 +64,8 @@ import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; +import javax.transaction.xa.Xid; +import javax.transaction.xa.XAException; import org.apache.geronimo.transaction.log.UnrecoverableLog; @@ -72,7 +74,7 @@ * * @version $Revision$ $Date$ */ -public class TransactionManagerImpl implements TransactionManager { +public class TransactionManagerImpl implements TransactionManager, XidImporter { private final TransactionLog txnLog; private final XidFactory xidFactory = new XidFactory(); private volatile int timeout; @@ -153,5 +155,64 @@ } finally { threadTx.set(null); } + } + + public Transaction importXid(Xid xid) throws XAException, SystemException { + if (getStatus() != Status.STATUS_NO_TRANSACTION) { + throw new XAException("Transaction already active in this thread"); + } + TransactionImpl tx = new TransactionImpl(xid, xidFactory, txnLog); + threadTx.set(tx); + return tx; + } + + public void commit(Transaction tx, boolean onePhase) throws XAException { + if (onePhase) { + try { + tx.commit(); + } catch (HeuristicMixedException e) { + throw new XAException(); + } catch (HeuristicRollbackException e) { + throw new XAException(); + } catch (RollbackException e) { + throw new XAException(); + } catch (SecurityException e) { + throw new XAException(); + } catch (SystemException e) { + throw new XAException(); + } + } else { + try { + ((TransactionImpl)tx).preparedCommit(); + } catch (SystemException e) { + throw new XAException(); + } + } + } + + public void forget(Transaction tx) throws XAException { + } + + public int prepare(Transaction tx) throws XAException { + try { + return ((TransactionImpl)tx).prepare(); + } catch (SystemException e) { + throw new XAException(); + } catch (RollbackException e) { + throw new XAException(); + } + } + + public void rollback(Transaction tx) throws XAException { + try { + tx.rollback(); + } catch (IllegalStateException e) { + throw new XAException(); + } catch (SystemException e) { + throw new XAException(); + } + } + + public void setTransactionTimeout(long milliseconds) { } } 1.2 +8 -3 incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidFactory.java Index: XidFactory.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidFactory.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- XidFactory.java 23 Jan 2004 18:54:16 -0000 1.1 +++ XidFactory.java 23 Feb 2004 20:28:43 -0000 1.2 @@ -67,7 +67,7 @@ * <li>4 or 16 byte IP address of host</li> * <ol> * @version $Revision$ $Date$ - * @todo Should have a way of setting baseId + * todo Should have a way of setting baseId */ public class XidFactory { byte[] baseId = new byte[Xid.MAXGTRIDSIZE]; @@ -106,6 +106,11 @@ } public Xid createBranch(Xid globalId, int branch) { - return new XidImpl(globalId, branch); + byte[] branchId = (byte[]) baseId.clone(); + branchId[0] = (byte) branch; + branchId[1] = (byte) (branch >>> 8); + branchId[2] = (byte) (branch >>> 16); + branchId[3] = (byte) (branch >>> 24); + return new XidImpl(globalId, branchId); } } 1.2 +7 -15 incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImpl.java Index: XidImpl.java =================================================================== RCS file: /home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImpl.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- XidImpl.java 23 Jan 2004 18:54:16 -0000 1.1 +++ XidImpl.java 23 Feb 2004 20:28:43 -0000 1.2 @@ -76,7 +76,7 @@ */ public XidImpl(byte[] globalId) { this.globalId = globalId; - this.hash = hash(globalId); + this.hash = hash(0, globalId); branchId = new byte[Xid.MAXBQUALSIZE]; } @@ -85,28 +85,20 @@ * @param global the xid of the global transaction this branch belongs to * @param branch the branch id */ - public XidImpl(Xid global, int branch) { + public XidImpl(Xid global, byte[] branch) { int hash; if (global instanceof XidImpl) { globalId = ((XidImpl) global).globalId; hash = ((XidImpl) global).hash; } else { globalId = global.getGlobalTransactionId(); - hash = hash(globalId); + hash = hash(0, globalId); } - branchId = new byte[Xid.MAXBQUALSIZE]; - branchId[0] = (byte) branch; - branchId[1] = (byte) (branch >>> 8); - branchId[2] = (byte) (branch >>> 16); - branchId[3] = (byte) (branch >>> 24); - for (int i = 0; i < 4; i++) { - hash = (hash * 37) + branchId[i]; - } - this.hash = hash; + branchId = branch; + this.hash = hash(hash, branchId); } - private int hash(byte[] id) { - int hash = 0; + private int hash(int hash, byte[] id) { for (int i = 0; i < id.length; i++) { hash = (hash * 37) + id[i]; } 1.1 incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImporter.java Index: XidImporter.java =================================================================== package org.apache.geronimo.transaction.manager; import javax.transaction.xa.Xid; import javax.transaction.xa.XAException; import javax.transaction.Transaction; import javax.transaction.SystemException; /** * * * @version $Revision: 1.1 $ $Date: 2004/02/23 20:28:43 $ * * */ public interface XidImporter { Transaction importXid(Xid xid) throws XAException, SystemException; void commit(Transaction tx, boolean onePhase) throws XAException; void forget(Transaction tx) throws XAException; int prepare(Transaction tx) throws XAException; void rollback(Transaction tx) throws XAException; void setTransactionTimeout(long milliseconds); }