djencks     2004/02/23 12:28:43

  Modified:    modules/connector/src/java/org/apache/geronimo/connector/work
                        GeronimoWorkManager.java WorkerContext.java
               modules/connector/src/test/org/apache/geronimo/connector/work
                        PooledWorkManagerTest.java
               modules/transaction/src/java/org/apache/geronimo/transaction
                        TransactionManagerProxy.java TransactionProxy.java
               
modules/transaction/src/java/org/apache/geronimo/transaction/manager
                        TransactionImpl.java TransactionManagerImpl.java
                        XidFactory.java XidImpl.java
  Added:       modules/transaction/src/java/org/apache/geronimo/transaction
                        XAWork.java
               
modules/transaction/src/java/org/apache/geronimo/transaction/manager
                        XidImporter.java
  Log:
  add remote tx importing support. no tests yet.
  
  Revision  Changes    Path
  1.2       +17 -29    
incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/GeronimoWorkManager.java
  
  Index: GeronimoWorkManager.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/GeronimoWorkManager.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- GeronimoWorkManager.java  23 Jan 2004 05:56:11 -0000      1.1
  +++ GeronimoWorkManager.java  23 Feb 2004 20:28:42 -0000      1.2
  @@ -66,11 +66,10 @@
   import org.apache.geronimo.connector.work.pool.StartWorkExecutorPool;
   import org.apache.geronimo.connector.work.pool.SyncWorkExecutorPool;
   import org.apache.geronimo.connector.work.pool.WorkExecutorPool;
  -import org.apache.geronimo.gbean.GAttributeInfo;
   import org.apache.geronimo.gbean.GBeanInfo;
   import org.apache.geronimo.gbean.GBeanInfoFactory;
   import org.apache.geronimo.gbean.GConstructorInfo;
  -import org.apache.geronimo.gbean.GOperationInfo;
  +import org.apache.geronimo.transaction.XAWork;
   
   /**
    * WorkManager implementation which uses under the cover three 
WorkExecutorPool
  @@ -90,8 +89,6 @@
       private final static int DEFAULT_MIN_POOL_SIZE = 0;
       private final static int DEFAULT_MAX_POOL_SIZE = 10;
   
  -    private static final GBeanInfo GBEAN_INFO;
  -
       /**
        * Pool of threads used by this WorkManager in order to process
        * the Work instances submitted via the doWork methods.
  @@ -110,21 +107,24 @@
        */
       private final WorkExecutorPool scheduledWorkExecutorPool;
   
  +    private final XAWork xaWork;
  +
       /**
        * Create a WorkManager.
        */
       public GeronimoWorkManager() {
  -        this(DEFAULT_MIN_POOL_SIZE, DEFAULT_MAX_POOL_SIZE);
  +        this(DEFAULT_MIN_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, null);
       }
   
  -    public GeronimoWorkManager(int minSize, int maxSize) {
  -        this(minSize, maxSize, minSize, maxSize, minSize, maxSize);
  +    public GeronimoWorkManager(int minSize, int maxSize, XAWork xaWork) {
  +        this(minSize, maxSize, minSize, maxSize, minSize, maxSize, xaWork);
       }
   
  -    public GeronimoWorkManager(int syncMinSize, int syncMaxSize, int 
startMinSize, int startMaxSize, int schedMinSize, int schedMaxSize) {
  +    public GeronimoWorkManager(int syncMinSize, int syncMaxSize, int 
startMinSize, int startMaxSize, int schedMinSize, int schedMaxSize, XAWork 
xaWork) {
           syncWorkExecutorPool = new SyncWorkExecutorPool(syncMinSize, 
syncMaxSize);
           startWorkExecutorPool = new StartWorkExecutorPool(startMinSize, 
startMaxSize);
           scheduledWorkExecutorPool = new 
ScheduleWorkExecutorPool(schedMinSize, schedMaxSize);
  +        this.xaWork = xaWork;
       }
   
       public int getSyncThreadCount() {
  @@ -204,7 +204,7 @@
               WorkListener workListener)
               throws WorkException {
           WorkerContext workWrapper =
  -                new WorkerContext(work, startTimeout, execContext, 
workListener);
  +                new WorkerContext(work, startTimeout, execContext, xaWork, 
workListener);
           workWrapper.setThreadPriority(Thread.currentThread().getPriority());
           syncWorkExecutorPool.executeWork(workWrapper);
       }
  @@ -229,7 +229,7 @@
               WorkListener workListener)
               throws WorkException {
           WorkerContext workWrapper =
  -                new WorkerContext(work, startTimeout, execContext, 
workListener);
  +                new WorkerContext(work, startTimeout, execContext, xaWork, 
workListener);
           workWrapper.setThreadPriority(Thread.currentThread().getPriority());
           startWorkExecutorPool.executeWork(workWrapper);
           return System.currentTimeMillis() - workWrapper.getAcceptedTime();
  @@ -254,31 +254,19 @@
               WorkListener workListener)
               throws WorkException {
           WorkerContext workWrapper =
  -                new WorkerContext(work, startTimeout, execContext, 
workListener);
  +                new WorkerContext(work, startTimeout, execContext, xaWork, 
workListener);
           workWrapper.setThreadPriority(Thread.currentThread().getPriority());
           scheduledWorkExecutorPool.executeWork(workWrapper);
       }
   
  +    public static final GBeanInfo GBEAN_INFO;
  +
       static {
           GBeanInfoFactory infoFactory = new 
GBeanInfoFactory(GeronimoWorkManager.class.getName());
  -        infoFactory.addAttribute(new GAttributeInfo("SyncThreadCount", 
true));
  -        infoFactory.addAttribute(new GAttributeInfo("SyncMinimumPoolSize", 
true));
  -        infoFactory.addAttribute(new GAttributeInfo("SyncMaximumPoolSize", 
true));
  -        infoFactory.addAttribute(new GAttributeInfo("StartThreadCount", 
true));
  -        infoFactory.addAttribute(new GAttributeInfo("StartMinimumPoolSize", 
true));
  -        infoFactory.addAttribute(new GAttributeInfo("StartMaximumPoolSize", 
true));
  -        infoFactory.addAttribute(new GAttributeInfo("ScheduledThreadCount", 
true));
  -        infoFactory.addAttribute(new 
GAttributeInfo("ScheduledMinimumPoolSize", true));
  -        infoFactory.addAttribute(new 
GAttributeInfo("ScheduledMaximumPoolSize", true));
  -        infoFactory.addOperation(new GOperationInfo("doWork", new 
String[]{Work.class.getName()}));
  -        infoFactory.addOperation(new GOperationInfo("doWork", new 
String[]{Work.class.getName(), Long.TYPE.getName(), 
ExecutionContext.class.getName(), WorkListener.class.getName()}));
  -        infoFactory.addOperation(new GOperationInfo("startWork", new 
String[]{Work.class.getName()}));
  -        infoFactory.addOperation(new GOperationInfo("startWork", new 
String[]{Work.class.getName(), Long.TYPE.getName(), 
ExecutionContext.class.getName(), WorkListener.class.getName()}));
  -        infoFactory.addOperation(new GOperationInfo("scheduleWork", new 
String[]{Work.class.getName()}));
  -        infoFactory.addOperation(new GOperationInfo("scheduleWork", new 
String[]{Work.class.getName(), Long.TYPE.getName(), 
ExecutionContext.class.getName(), WorkListener.class.getName()}));
  +        infoFactory.addInterface(WorkManager.class, new 
String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize", "StartMinimumPoolSize", 
"StartMaximumPoolSize", "ScheduledMinimumPoolSize", "ScheduledMaximumPoolSize", 
"XAWork"});
           infoFactory.setConstructor(new GConstructorInfo(
  -                new String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize", 
"StartMinimumPoolSize", "StartMaximumPoolSize", "ScheduledMinimumPoolSize", 
"ScheduledMaximumPoolSize"},
  -                new Class[]{Integer.TYPE, Integer.TYPE, Integer.TYPE, 
Integer.TYPE, Integer.TYPE, Integer.TYPE}));
  +                new String[]{"SyncMinimumPoolSize", "SyncMaximumPoolSize", 
"StartMinimumPoolSize", "StartMaximumPoolSize", "ScheduledMinimumPoolSize", 
"ScheduledMaximumPoolSize", "XAWork"},
  +                new Class[]{Integer.TYPE, Integer.TYPE, Integer.TYPE, 
Integer.TYPE, Integer.TYPE, Integer.TYPE, XAWork.class}));
           GBEAN_INFO = infoFactory.getBeanInfo();
       }
   
  
  
  
  1.2       +22 -5     
incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java
  
  Index: WorkerContext.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- WorkerContext.java        23 Jan 2004 05:56:11 -0000      1.1
  +++ WorkerContext.java        23 Feb 2004 20:28:42 -0000      1.2
  @@ -67,6 +67,7 @@
   
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
  +import org.apache.geronimo.transaction.XAWork;
   
   import EDU.oswego.cs.dl.util.concurrent.Latch;
   
  @@ -118,7 +119,9 @@
       /**
        * Execution context of the actual work to be executed.
        */
  -    private ExecutionContext executionContext;
  +    private final ExecutionContext executionContext;
  +
  +    private final XAWork xaWork;
   
       /**
        * Listener to be notified during the life-cycle of the work treatment.
  @@ -147,6 +150,8 @@
        */
       public WorkerContext(Work aWork) {
           adaptee = aWork;
  +        executionContext = null;
  +        xaWork = null;
       }
   
       /**
  @@ -162,11 +167,13 @@
        * work completed) occur.
        */
       public WorkerContext(Work aWork, long aStartTimeout,
  -            ExecutionContext execContext,
  -            WorkListener workListener) {
  +                         ExecutionContext execContext,
  +                         XAWork xaWork,
  +                         WorkListener workListener) {
           adaptee = aWork;
           startTimeOut = aStartTimeout;
           executionContext = execContext;
  +        this.xaWork = xaWork;
           if (null != workListener) {
               this.workListener = workListener;
           }
  @@ -303,7 +310,17 @@
                   new WorkEvent(this, WorkEvent.WORK_STARTED, adaptee, null));
           startLatch.release();
           try {
  -            adaptee.run();
  +            if (executionContext == null || executionContext.getXid() == 
null) {
  +                adaptee.run();
  +            } else {
  +                try {
  +                    xaWork.begin(executionContext.getXid(), 
executionContext.getTransactionTimeout());
  +                    adaptee.run();
  +                } finally {
  +                    xaWork.end(executionContext.getXid());
  +                }
  +
  +            }
               workListener.workCompleted(
                       new WorkEvent(this, WorkEvent.WORK_COMPLETED, adaptee, 
null));
           } catch (Throwable e) {
  
  
  
  1.2       +2 -2      
incubator-geronimo/modules/connector/src/test/org/apache/geronimo/connector/work/PooledWorkManagerTest.java
  
  Index: PooledWorkManagerTest.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/connector/src/test/org/apache/geronimo/connector/work/PooledWorkManagerTest.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- PooledWorkManagerTest.java        23 Jan 2004 05:56:11 -0000      1.1
  +++ PooledWorkManagerTest.java        23 Feb 2004 20:28:43 -0000      1.2
  @@ -84,7 +84,7 @@
       private static final int m_tempo = 200;
   
       protected void setUp() throws Exception {
  -        m_workManager = new GeronimoWorkManager(1, 1);
  +        m_workManager = new GeronimoWorkManager(1, 1, null);
       }
   
       public void testDoWork() throws Exception {
  
  
  
  1.3       +98 -15    
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionManagerProxy.java
  
  Index: TransactionManagerProxy.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionManagerProxy.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- TransactionManagerProxy.java      30 Jan 2004 01:32:00 -0000      1.2
  +++ TransactionManagerProxy.java      23 Feb 2004 20:28:43 -0000      1.3
  @@ -55,6 +55,11 @@
    */
   package org.apache.geronimo.transaction;
   
  +import java.util.Map;
  +import java.util.HashMap;
  +import java.util.Set;
  +import java.util.HashSet;
  +
   import javax.resource.spi.XATerminator;
   import javax.transaction.HeuristicMixedException;
   import javax.transaction.HeuristicRollbackException;
  @@ -74,6 +79,7 @@
   import org.apache.geronimo.gbean.GConstructorInfo;
   import org.apache.geronimo.gbean.GAttributeInfo;
   import org.apache.geronimo.transaction.manager.TransactionManagerImpl;
  +import org.apache.geronimo.transaction.manager.XidImporter;
   
   /**
    * A wrapper for a TransactionManager that wraps all Transactions in a 
TransactionProxy
  @@ -83,19 +89,23 @@
    *
    * @version $Revision$ $Date$
    */
  -public class TransactionManagerProxy implements TransactionManager, 
XATerminator {
  +public class TransactionManagerProxy implements TransactionManager, 
XATerminator, XAWork {
   
       public static final GBeanInfo GBEAN_INFO;
   
       private final TransactionManager delegate;
  +    private final XidImporter importer;
       private final ThreadLocal threadTx = new ThreadLocal();
  +    private final Map importedTransactions = new HashMap();
  +    private Set activeTransactions = new HashSet();
   
       /**
        * Constructor taking the TransactionManager to wrap.
        * @param delegate the TransactionManager that should be wrapped
        */
  -    public TransactionManagerProxy(TransactionManager delegate) {
  +    public TransactionManagerProxy(TransactionManager delegate, XidImporter 
importer) {
           this.delegate = delegate;
  +        this.importer = importer;
       }
   
       /**
  @@ -103,6 +113,7 @@
        */
       public TransactionManagerProxy() {
           this.delegate = new TransactionManagerImpl();
  +        this.importer = null;
       }
   
       public void setTransactionTimeout(int timeout) throws SystemException {
  @@ -174,22 +185,55 @@
       /**
        * @see javax.resource.spi.XATerminator#commit(javax.transaction.xa.Xid, 
boolean)
        */
  -    public void commit(Xid arg0, boolean arg1) throws XAException {
  -        throw new XAException("Not implemented.");
  +    public void commit(Xid xid, boolean onePhase) throws XAException {
  +        TransactionProxy tx = (TransactionProxy) 
importedTransactions.remove(xid);
  +        if (tx == null) {
  +            throw new XAException("No imported transaction for xid: " + xid);
  +        }
  +
  +        try {
  +            int status = tx.getStatus();
  +            assert status == Status.STATUS_ACTIVE || status == 
Status.STATUS_PREPARED;
  +        } catch (SystemException e) {
  +            throw new XAException();
  +        }
  +        importer.commit(tx.getDelegate(), onePhase);
       }
   
       /**
        * @see javax.resource.spi.XATerminator#forget(javax.transaction.xa.Xid)
        */
  -    public void forget(Xid arg0) throws XAException {
  -        throw new XAException("Not implemented.");
  +    public void forget(Xid xid) throws XAException {
  +        TransactionProxy tx = (TransactionProxy) 
importedTransactions.remove(xid);
  +        if (tx == null) {
  +            throw new XAException("No imported transaction for xid: " + xid);
  +        }
  +
  +        try {
  +            int status = tx.getStatus();
  +            //assert status == Status.STATUS_ACTIVE || status == 
Status.STATUS_PREPARED;
  +        } catch (SystemException e) {
  +            throw new XAException();
  +        }
  +        importer.forget(tx.getDelegate());
       }
   
       /**
        * @see javax.resource.spi.XATerminator#prepare(javax.transaction.xa.Xid)
        */
  -    public int prepare(Xid arg0) throws XAException {
  -        throw new XAException("Not implemented.");
  +    public int prepare(Xid xid) throws XAException {
  +        TransactionProxy tx = (TransactionProxy) 
importedTransactions.get(xid);
  +        if (tx == null) {
  +            throw new XAException("No imported transaction for xid: " + xid);
  +        }
  +
  +        try {
  +            int status = tx.getStatus();
  +            assert status == Status.STATUS_ACTIVE;
  +        } catch (SystemException e) {
  +            throw new XAException();
  +        }
  +        return importer.prepare(tx.getDelegate());
       }
   
       /**
  @@ -202,8 +246,47 @@
       /**
        * @see 
javax.resource.spi.XATerminator#rollback(javax.transaction.xa.Xid)
        */
  -    public void rollback(Xid arg0) throws XAException {
  -        throw new XAException("Not implemented.");
  +    public void rollback(Xid xid) throws XAException {
  +        TransactionProxy tx = (TransactionProxy) 
importedTransactions.remove(xid);
  +        if (tx == null) {
  +            throw new XAException("No imported transaction for xid: " + xid);
  +        }
  +
  +        try {
  +            int status = tx.getStatus();
  +            assert status == Status.STATUS_ACTIVE || status == 
Status.STATUS_PREPARED;
  +        } catch (SystemException e) {
  +            throw new XAException();
  +        }
  +        importer.rollback(tx.getDelegate());
  +    }
  +
  +    public void begin(Xid xid, long txTimeoutMillis) throws XAException {
  +        TransactionProxy tx = (TransactionProxy) 
importedTransactions.get(xid);
  +        if (tx == null) {
  +            try {
  +                tx = new TransactionProxy(importer.importXid(xid));
  +            } catch (SystemException e) {
  +                throw (XAException)new XAException("Could not import 
xid").initCause(e);
  +            }
  +            importedTransactions.put(xid, tx);
  +        }
  +        if (activeTransactions.contains(tx)) {
  +            throw new XAException("Xid already active");
  +        }
  +        activeTransactions.add(tx);
  +        threadTx.set(tx);
  +        importer.setTransactionTimeout(txTimeoutMillis);
  +    }
  +
  +    public void end(Xid xid) throws XAException {
  +        TransactionProxy tx = (TransactionProxy) 
importedTransactions.get(xid);
  +        if (tx == null) {
  +            throw new XAException("No imported transaction for xid: " + xid);
  +        }
  +        if (!activeTransactions.remove(tx)) {
  +            throw new XAException("tx not active for xid: " + xid);
  +        }
       }
   
       //for now we use the default constructor.
  @@ -211,17 +294,17 @@
           GBeanInfoFactory infoFactory = new 
GBeanInfoFactory(TransactionManagerProxy.class.getName());
   
           infoFactory.setConstructor(new GConstructorInfo(
  -                new String[] { "Delegate" },
  -                new Class[] { TransactionManager.class }));
  +                new String[]{"Delegate"},
  +                new Class[]{TransactionManager.class}));
   
           infoFactory.addAttribute(new GAttributeInfo("Delegate", true));
   
  -        infoFactory.addOperation(new GOperationInfo("setTransactionTimeout", 
new String[] {Integer.TYPE.getName()}));
  +        infoFactory.addOperation(new GOperationInfo("setTransactionTimeout", 
new String[]{Integer.TYPE.getName()}));
           infoFactory.addOperation(new GOperationInfo("begin"));
           infoFactory.addOperation(new GOperationInfo("getStatus"));
           infoFactory.addOperation(new GOperationInfo("getTransaction"));
           infoFactory.addOperation(new GOperationInfo("suspend"));
  -        infoFactory.addOperation(new GOperationInfo("resume", new String[] 
{Transaction.class.getName()}));
  +        infoFactory.addOperation(new GOperationInfo("resume", new 
String[]{Transaction.class.getName()}));
           infoFactory.addOperation(new GOperationInfo("commit"));
           infoFactory.addOperation(new GOperationInfo("rollback"));
           infoFactory.addOperation(new GOperationInfo("setRollbackOnly"));
  
  
  
  1.2       +5 -1      
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionProxy.java
  
  Index: TransactionProxy.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/TransactionProxy.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TransactionProxy.java     23 Jan 2004 18:54:15 -0000      1.1
  +++ TransactionProxy.java     23 Feb 2004 20:28:43 -0000      1.2
  @@ -105,4 +105,8 @@
       public void setRollbackOnly() throws IllegalStateException, 
SystemException {
           delegate.setRollbackOnly();
       }
  +
  +    Transaction getDelegate() {
  +        return delegate;
  +    }
   }
  
  
  
  1.1                  
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/XAWork.java
  
  Index: XAWork.java
  ===================================================================
  package org.apache.geronimo.transaction;
  
  import javax.transaction.xa.Xid;
  import javax.transaction.xa.XAException;
  
  /**
   * primarily an interface between the WorkManager/ExecutionContext and the tm.
   *
   * @version $Revision: 1.1 $ $Date: 2004/02/23 20:28:43 $
   *
   * */
  public interface XAWork {
      void begin(Xid xid, long txTimeout) throws XAException;
      void end(Xid xid) throws XAException;
  }
  
  
  
  1.2       +204 -132  
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionImpl.java
  
  Index: TransactionImpl.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionImpl.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TransactionImpl.java      23 Jan 2004 18:54:16 -0000      1.1
  +++ TransactionImpl.java      23 Feb 2004 20:28:43 -0000      1.2
  @@ -63,6 +63,7 @@
   import java.util.List;
   import java.util.Map;
   import java.util.Set;
  +
   import javax.transaction.HeuristicMixedException;
   import javax.transaction.HeuristicRollbackException;
   import javax.transaction.RollbackException;
  @@ -94,9 +95,13 @@
       private Map xaResources = new HashMap(3);
   
       TransactionImpl(XidFactory xidFactory, TransactionLog txnLog) throws 
SystemException {
  +        this(xidFactory.createXid(), xidFactory, txnLog);
  +    }
  +
  +    TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog) 
throws SystemException {
           this.xidFactory = xidFactory;
           this.txnLog = txnLog;
  -        this.xid = xidFactory.createXid();
  +        this.xid = xid;
           try {
               txnLog.begin(xid);
           } catch (IOException e) {
  @@ -114,16 +119,16 @@
   
       public synchronized void setRollbackOnly() throws IllegalStateException, 
SystemException {
           switch (status) {
  -        case Status.STATUS_ACTIVE:
  -        case Status.STATUS_PREPARING:
  -            status = Status.STATUS_MARKED_ROLLBACK;
  -            break;
  -        case Status.STATUS_MARKED_ROLLBACK:
  -        case Status.STATUS_ROLLING_BACK:
  -            // nothing to do
  -            break;
  -        default:
  -            throw new IllegalStateException("Cannot set rollback only, 
status is " + getStateString(status));
  +            case Status.STATUS_ACTIVE:
  +            case Status.STATUS_PREPARING:
  +                status = Status.STATUS_MARKED_ROLLBACK;
  +                break;
  +            case Status.STATUS_MARKED_ROLLBACK:
  +            case Status.STATUS_ROLLING_BACK:
  +                // nothing to do
  +                break;
  +            default:
  +                throw new IllegalStateException("Cannot set rollback only, 
status is " + getStateString(status));
           }
       }
   
  @@ -132,13 +137,13 @@
               throw new IllegalArgumentException("Synchronization is null");
           }
           switch (status) {
  -        case Status.STATUS_ACTIVE:
  -        case Status.STATUS_PREPARING:
  -            break;
  -        case Status.STATUS_MARKED_ROLLBACK:
  -            throw new RollbackException("Transaction is marked for 
rollback");
  -        default:
  -            throw new IllegalStateException("Status is " + 
getStateString(status));
  +            case Status.STATUS_ACTIVE:
  +            case Status.STATUS_PREPARING:
  +                break;
  +            case Status.STATUS_MARKED_ROLLBACK:
  +                throw new RollbackException("Transaction is marked for 
rollback");
  +            default:
  +                throw new IllegalStateException("Status is " + 
getStateString(status));
           }
           syncList.add(synch);
       }
  @@ -148,12 +153,12 @@
               throw new IllegalArgumentException("XAResource is null");
           }
           switch (status) {
  -        case Status.STATUS_ACTIVE:
  -            break;
  -        case Status.STATUS_MARKED_ROLLBACK:
  -            throw new RollbackException("Transaction is marked for 
rollback");
  -        default:
  -            throw new IllegalStateException("Status is " + 
getStateString(status));
  +            case Status.STATUS_ACTIVE:
  +                break;
  +            case Status.STATUS_MARKED_ROLLBACK:
  +                throw new RollbackException("Transaction is marked for 
rollback");
  +            default:
  +                throw new IllegalStateException("Status is " + 
getStateString(status));
           }
   
           try {
  @@ -196,11 +201,11 @@
               throw new IllegalArgumentException("XAResource is null");
           }
           switch (status) {
  -        case Status.STATUS_ACTIVE:
  -        case Status.STATUS_MARKED_ROLLBACK:
  -            break;
  -        default:
  -            throw new IllegalStateException("Status is " + 
getStateString(status));
  +            case Status.STATUS_ACTIVE:
  +            case Status.STATUS_MARKED_ROLLBACK:
  +                break;
  +            default:
  +                throw new IllegalStateException("Status is " + 
getStateString(status));
           }
           ResourceManager manager = (ResourceManager) 
xaResources.remove(xaRes);
           if (manager == null) {
  @@ -215,27 +220,21 @@
           }
       }
   
  +    //Transaction method, does 2pc
       public void commit() throws HeuristicMixedException, 
HeuristicRollbackException, RollbackException, SecurityException, 
SystemException {
  -        synchronized (this) {
  -            switch (status) {
  -            case Status.STATUS_ACTIVE:
  -            case Status.STATUS_MARKED_ROLLBACK:
  -                break;
  -            default:
  -                throw new IllegalStateException("Status is " + 
getStateString(status));
  -            }
  -        }
  +        beforePrepare();
   
  -        beforeCompletion();
  -        endResources();
           try {
  -            LinkedList rms;
  +            if (status == Status.STATUS_MARKED_ROLLBACK) {
  +                rollbackResources(resourceManagers);
  +                throw new RollbackException("Unable to commit");
  +            }
               synchronized (this) {
                   if (status == Status.STATUS_ACTIVE) {
  -                    if (resourceManagers.size() == 0) {
  +                    if (this.resourceManagers.size() == 0) {
                           // nothing to commit
                           status = Status.STATUS_COMMITTED;
  -                    } else if (resourceManagers.size() == 1) {
  +                    } else if (this.resourceManagers.size() == 1) {
                           // one-phase commit decision
                           status = Status.STATUS_COMMITTING;
                       } else {
  @@ -244,12 +243,11 @@
                       }
                   }
                   // resourceManagers is now immutable
  -                rms = resourceManagers;
               }
   
               // one-phase
  -            if (rms.size() == 1) {
  -                ResourceManager manager = (ResourceManager) rms.getFirst();
  +            if (resourceManagers.size() == 1) {
  +                ResourceManager manager = (ResourceManager) 
resourceManagers.getFirst();
                   try {
                       manager.committer.commit(manager.branchId, true);
                       synchronized (this) {
  @@ -267,74 +265,72 @@
               }
   
               // two-phase
  -            try {
  -                txnLog.prepare(xid);
  -            } catch (IOException e) {
  -                try {
  -                    rollbackResources(rms);
  -                } catch (Exception se) {
  -                    log.error("Unable to rollback after failure to log 
prepare", se.getCause());
  -                }
  -                SystemException ex = new SystemException("Error logging 
prepare; transaction was rolled back)");
  -                ex.initCause(e);
  -                throw ex;
  +            boolean willCommit = internalPrepare();
  +
  +            // notify the RMs
  +            if (willCommit) {
  +                commitResources(resourceManagers);
  +            } else {
  +                rollbackResources(resourceManagers);
  +                throw new RollbackException("Unable to commit");
               }
  -            for (Iterator i = rms.iterator(); i.hasNext();) {
  -                synchronized (this) {
  -                    if (status != Status.STATUS_PREPARING) {
  -                        // we were marked for rollback
  -                        break;
  -                    }
  -                }
  -                ResourceManager manager = (ResourceManager) i.next();
  -                try {
  -                    int vote = manager.committer.prepare(manager.branchId);
  -                    if (vote == XAResource.XA_RDONLY) {
  -                        // we don't need to consider this RM any more
  -                        i.remove();
  -                    }
  -                } catch (XAException e) {
  -                    synchronized (this) {
  -                        status = Status.STATUS_MARKED_ROLLBACK;
  -                    }
  -                }
  +        } finally {
  +            afterCompletion();
  +            synchronized (this) {
  +                status = Status.STATUS_NO_TRANSACTION;
               }
  +        }
  +    }
   
  -            // decision time...
  -            boolean willCommit;
  +    //Used from XATerminator for first phase in a remotely controlled tx.
  +    int prepare() throws SystemException, RollbackException {
  +        beforePrepare();
  +        int result = XAResource.XA_RDONLY;
  +        try {
  +            LinkedList rms;
               synchronized (this) {
  -                willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
  -                if (willCommit) {
  -                    status = Status.STATUS_PREPARED;
  +                if (status == Status.STATUS_ACTIVE) {
  +                    if (resourceManagers.size() == 0) {
  +                        // nothing to commit
  +                        status = Status.STATUS_COMMITTED;
  +                        return result;
  +                    } else {
  +                        // start prepare part of two-phase
  +                        status = Status.STATUS_PREPARING;
  +                    }
                   }
  +                // resourceManagers is now immutable
  +                rms = resourceManagers;
               }
   
  -            // log our decision
  -            try {
  -                if (willCommit) {
  -                    txnLog.commit(xid);
  -                } else {
  -                    txnLog.rollback(xid);
  -                }
  -            } catch (IOException e) {
  -                try {
  -                    rollbackResources(rms);
  -                } catch (Exception se) {
  -                    log.error("Unable to rollback after failure to log 
decision", se.getCause());
  -                }
  -                SystemException ex = new SystemException("Error logging 
decision (outcome is unknown)");
  -                ex.initCause(e);
  -                throw ex;
  -            }
  +            boolean willCommit = internalPrepare();
   
               // notify the RMs
               if (willCommit) {
  -                commitResources(rms);
  +                if (!rms.isEmpty()) {
  +                    result = XAResource.XA_OK;
  +                }
  +
               } else {
                   rollbackResources(rms);
                   throw new RollbackException("Unable to commit");
               }
           } finally {
  +            if (result == XAResource.XA_RDONLY) {
  +                afterCompletion();
  +                synchronized (this) {
  +                    status = Status.STATUS_NO_TRANSACTION;
  +                }
  +            }
  +        }
  +        return result;
  +    }
  +
  +    //used from XATerminator for commit phase of non-readonly remotely 
controlled tx.
  +    void preparedCommit() throws SystemException {
  +        try {
  +            commitResources(resourceManagers);
  +        } finally {
               afterCompletion();
               synchronized (this) {
                   status = Status.STATUS_NO_TRANSACTION;
  @@ -342,17 +338,94 @@
           }
       }
   
  +    //helper method used by Transaction.commit and XATerminator prepare.
  +    private void beforePrepare() {
  +        synchronized (this) {
  +            switch (status) {
  +                case Status.STATUS_ACTIVE:
  +                case Status.STATUS_MARKED_ROLLBACK:
  +                    break;
  +                default:
  +                    throw new IllegalStateException("Status is " + 
getStateString(status));
  +            }
  +        }
  +
  +        beforeCompletion();
  +        endResources();
  +    }
  +
  +
  +    //helper method used by Transaction.commit and XATerminator prepare.
  +    private boolean internalPrepare() throws SystemException {
  +        try {
  +            txnLog.prepare(xid);
  +        } catch (IOException e) {
  +            try {
  +                rollbackResources(resourceManagers);
  +            } catch (Exception se) {
  +                log.error("Unable to rollback after failure to log prepare", 
se.getCause());
  +            }
  +            throw (SystemException) new SystemException("Error logging 
prepare; transaction was rolled back)").initCause(e);
  +        }
  +        for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
  +            synchronized (this) {
  +                if (status != Status.STATUS_PREPARING) {
  +                    // we were marked for rollback
  +                    break;
  +                }
  +            }
  +            ResourceManager manager = (ResourceManager) i.next();
  +            try {
  +                int vote = manager.committer.prepare(manager.branchId);
  +                if (vote == XAResource.XA_RDONLY) {
  +                    // we don't need to consider this RM any more
  +                    i.remove();
  +                }
  +            } catch (XAException e) {
  +                synchronized (this) {
  +                    status = Status.STATUS_MARKED_ROLLBACK;
  +                }
  +            }
  +        }
  +
  +        // decision time...
  +        boolean willCommit;
  +        synchronized (this) {
  +            willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
  +            if (willCommit) {
  +                status = Status.STATUS_PREPARED;
  +            }
  +        }
  +
  +        // log our decision
  +        try {
  +            if (willCommit) {
  +                txnLog.commit(xid);
  +            } else {
  +                txnLog.rollback(xid);
  +            }
  +        } catch (IOException e) {
  +            try {
  +                rollbackResources(resourceManagers);
  +            } catch (Exception se) {
  +                log.error("Unable to rollback after failure to log 
decision", se.getCause());
  +            }
  +            throw (SystemException) new SystemException("Error logging 
decision (outcome is unknown)").initCause(e);
  +        }
  +        return willCommit;
  +    }
  +
       public void rollback() throws IllegalStateException, SystemException {
           List rms;
           synchronized (this) {
               switch (status) {
  -            case Status.STATUS_ACTIVE:
  -                status = Status.STATUS_MARKED_ROLLBACK;
  -                break;
  -            case Status.STATUS_MARKED_ROLLBACK:
  -                break;
  -            default:
  -                throw new IllegalStateException("Status is " + 
getStateString(status));
  +                case Status.STATUS_ACTIVE:
  +                    status = Status.STATUS_MARKED_ROLLBACK;
  +                    break;
  +                case Status.STATUS_MARKED_ROLLBACK:
  +                    break;
  +                default:
  +                    throw new IllegalStateException("Status is " + 
getStateString(status));
               }
               rms = resourceManagers;
           }
  @@ -368,9 +441,7 @@
                   } catch (Exception se) {
                       log.error("Unable to rollback after failure to log 
decision", se.getCause());
                   }
  -                SystemException ex = new SystemException("Error logging 
rollback");
  -                ex.initCause(e);
  -                throw ex;
  +                throw (SystemException) new SystemException("Error logging 
rollback").initCause(e);
               }
               rollbackResources(rms);
           } finally {
  @@ -494,28 +565,28 @@
   
       private static String getStateString(int status) {
           switch (status) {
  -        case Status.STATUS_ACTIVE:
  -            return "STATUS_ACTIVE";
  -        case Status.STATUS_PREPARING:
  -            return "STATUS_PREPARING";
  -        case Status.STATUS_PREPARED:
  -            return "STATUS_PREPARED";
  -        case Status.STATUS_MARKED_ROLLBACK:
  -            return "STATUS_MARKED_ROLLBACK";
  -        case Status.STATUS_ROLLING_BACK:
  -            return "STATUS_ROLLING_BACK";
  -        case Status.STATUS_COMMITTING:
  -            return "STATUS_COMMITTING";
  -        case Status.STATUS_COMMITTED:
  -            return "STATUS_COMMITTED";
  -        case Status.STATUS_ROLLEDBACK:
  -            return "STATUS_ROLLEDBACK";
  -        case Status.STATUS_NO_TRANSACTION:
  -            return "STATUS_NO_TRANSACTION";
  -        case Status.STATUS_UNKNOWN:
  -            return "STATUS_UNKNOWN";
  -        default:
  -            throw new AssertionError();
  +            case Status.STATUS_ACTIVE:
  +                return "STATUS_ACTIVE";
  +            case Status.STATUS_PREPARING:
  +                return "STATUS_PREPARING";
  +            case Status.STATUS_PREPARED:
  +                return "STATUS_PREPARED";
  +            case Status.STATUS_MARKED_ROLLBACK:
  +                return "STATUS_MARKED_ROLLBACK";
  +            case Status.STATUS_ROLLING_BACK:
  +                return "STATUS_ROLLING_BACK";
  +            case Status.STATUS_COMMITTING:
  +                return "STATUS_COMMITTING";
  +            case Status.STATUS_COMMITTED:
  +                return "STATUS_COMMITTED";
  +            case Status.STATUS_ROLLEDBACK:
  +                return "STATUS_ROLLEDBACK";
  +            case Status.STATUS_NO_TRANSACTION:
  +                return "STATUS_NO_TRANSACTION";
  +            case Status.STATUS_UNKNOWN:
  +                return "STATUS_UNKNOWN";
  +            default:
  +                throw new AssertionError();
           }
       }
   
  @@ -527,6 +598,7 @@
               return false;
           }
       }
  +
   
       private static class ResourceManager {
           private final XAResource committer;
  
  
  
  1.2       +63 -2     
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionManagerImpl.java
  
  Index: TransactionManagerImpl.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/TransactionManagerImpl.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- TransactionManagerImpl.java       23 Jan 2004 18:54:16 -0000      1.1
  +++ TransactionManagerImpl.java       23 Feb 2004 20:28:43 -0000      1.2
  @@ -64,6 +64,8 @@
   import javax.transaction.SystemException;
   import javax.transaction.Transaction;
   import javax.transaction.TransactionManager;
  +import javax.transaction.xa.Xid;
  +import javax.transaction.xa.XAException;
   
   import org.apache.geronimo.transaction.log.UnrecoverableLog;
   
  @@ -72,7 +74,7 @@
    *
    * @version $Revision$ $Date$
    */
  -public class TransactionManagerImpl implements TransactionManager {
  +public class TransactionManagerImpl implements TransactionManager, 
XidImporter {
       private final TransactionLog txnLog;
       private final XidFactory xidFactory = new XidFactory();
       private volatile int timeout;
  @@ -153,5 +155,64 @@
           } finally {
               threadTx.set(null);
           }
  +    }
  +
  +    public Transaction importXid(Xid xid) throws XAException, 
SystemException {
  +        if (getStatus() != Status.STATUS_NO_TRANSACTION) {
  +            throw new XAException("Transaction already active in this 
thread");
  +        }
  +        TransactionImpl tx = new TransactionImpl(xid, xidFactory, txnLog);
  +        threadTx.set(tx);
  +        return tx;
  +    }
  +
  +    public void commit(Transaction tx, boolean onePhase) throws XAException {
  +        if (onePhase) {
  +            try {
  +                tx.commit();
  +            } catch (HeuristicMixedException e) {
  +                throw new XAException();
  +            } catch (HeuristicRollbackException e) {
  +                throw new XAException();
  +            } catch (RollbackException e) {
  +                throw new XAException();
  +            } catch (SecurityException e) {
  +                throw new XAException();
  +            } catch (SystemException e) {
  +                throw new XAException();
  +            }
  +        } else {
  +            try {
  +                ((TransactionImpl)tx).preparedCommit();
  +            } catch (SystemException e) {
  +                throw new XAException();
  +            }
  +        }
  +    }
  +
  +    public void forget(Transaction tx) throws XAException {
  +    }
  +
  +    public int prepare(Transaction tx) throws XAException {
  +        try {
  +            return ((TransactionImpl)tx).prepare();
  +        } catch (SystemException e) {
  +            throw new XAException();
  +        } catch (RollbackException e) {
  +            throw new XAException();
  +        }
  +    }
  +
  +    public void rollback(Transaction tx) throws XAException {
  +        try {
  +            tx.rollback();
  +        } catch (IllegalStateException e) {
  +            throw new XAException();
  +        } catch (SystemException e) {
  +            throw new XAException();
  +        }
  +    }
  +
  +    public void setTransactionTimeout(long milliseconds) {
       }
   }
  
  
  
  1.2       +8 -3      
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidFactory.java
  
  Index: XidFactory.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidFactory.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- XidFactory.java   23 Jan 2004 18:54:16 -0000      1.1
  +++ XidFactory.java   23 Feb 2004 20:28:43 -0000      1.2
  @@ -67,7 +67,7 @@
    * <li>4 or 16 byte IP address of host</li>
    * <ol>
    * @version $Revision$ $Date$
  - * @todo Should have a way of setting baseId
  + * todo Should have a way of setting baseId
    */
   public class XidFactory {
       byte[] baseId = new byte[Xid.MAXGTRIDSIZE];
  @@ -106,6 +106,11 @@
       }
   
       public Xid createBranch(Xid globalId, int branch) {
  -        return new XidImpl(globalId, branch);
  +        byte[] branchId = (byte[]) baseId.clone();
  +        branchId[0] = (byte) branch;
  +        branchId[1] = (byte) (branch >>> 8);
  +        branchId[2] = (byte) (branch >>> 16);
  +        branchId[3] = (byte) (branch >>> 24);
  +        return new XidImpl(globalId, branchId);
       }
   }
  
  
  
  1.2       +7 -15     
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImpl.java
  
  Index: XidImpl.java
  ===================================================================
  RCS file: 
/home/cvs/incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImpl.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- XidImpl.java      23 Jan 2004 18:54:16 -0000      1.1
  +++ XidImpl.java      23 Feb 2004 20:28:43 -0000      1.2
  @@ -76,7 +76,7 @@
        */
       public XidImpl(byte[] globalId) {
           this.globalId = globalId;
  -        this.hash = hash(globalId);
  +        this.hash = hash(0, globalId);
           branchId = new byte[Xid.MAXBQUALSIZE];
       }
   
  @@ -85,28 +85,20 @@
        * @param global the xid of the global transaction this branch belongs to
        * @param branch the branch id
        */
  -    public XidImpl(Xid global, int branch) {
  +    public XidImpl(Xid global, byte[] branch) {
           int hash;
           if (global instanceof XidImpl) {
               globalId = ((XidImpl) global).globalId;
               hash = ((XidImpl) global).hash;
           } else {
               globalId = global.getGlobalTransactionId();
  -            hash = hash(globalId);
  +            hash = hash(0, globalId);
           }
  -        branchId = new byte[Xid.MAXBQUALSIZE];
  -        branchId[0] = (byte) branch;
  -        branchId[1] = (byte) (branch >>> 8);
  -        branchId[2] = (byte) (branch >>> 16);
  -        branchId[3] = (byte) (branch >>> 24);
  -        for (int i = 0; i < 4; i++) {
  -            hash = (hash * 37) + branchId[i];
  -        }
  -        this.hash = hash;
  +        branchId = branch;
  +        this.hash = hash(hash, branchId);
       }
   
  -    private int hash(byte[] id) {
  -        int hash = 0;
  +    private int hash(int hash, byte[] id) {
           for (int i = 0; i < id.length; i++) {
               hash = (hash * 37) + id[i];
           }
  
  
  
  1.1                  
incubator-geronimo/modules/transaction/src/java/org/apache/geronimo/transaction/manager/XidImporter.java
  
  Index: XidImporter.java
  ===================================================================
  package org.apache.geronimo.transaction.manager;
  
  import javax.transaction.xa.Xid;
  import javax.transaction.xa.XAException;
  import javax.transaction.Transaction;
  import javax.transaction.SystemException;
  
  /**
   *
   *
   * @version $Revision: 1.1 $ $Date: 2004/02/23 20:28:43 $
   *
   * */
  public interface XidImporter {
  
      Transaction importXid(Xid xid) throws XAException, SystemException;
  
      void commit(Transaction tx, boolean onePhase) throws XAException;
      void forget(Transaction tx) throws XAException;
      int prepare(Transaction tx) throws XAException;
      void rollback(Transaction tx) throws XAException;
      void setTransactionTimeout(long milliseconds);
  }
  
  
  

Reply via email to