ozeigermann 2004/07/05 03:55:21 Modified: src/stores/org/apache/slide/store/impl/rdbms RDBMSComparableResourcesPool.java AbstractRDBMSStore.java Log: Migration to unified XA stuff inherited from AbstractXAServiceBase Revision Changes Path 1.2 +4 -4 jakarta-slide/src/stores/org/apache/slide/store/impl/rdbms/RDBMSComparableResourcesPool.java Index: RDBMSComparableResourcesPool.java =================================================================== RCS file: /home/cvs/jakarta-slide/src/stores/org/apache/slide/store/impl/rdbms/RDBMSComparableResourcesPool.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- RDBMSComparableResourcesPool.java 4 Jul 2004 18:57:35 -0000 1.1 +++ RDBMSComparableResourcesPool.java 5 Jul 2004 10:55:21 -0000 1.2 @@ -133,7 +133,7 @@ } private ObjectNode[] retrieveObjects() throws ServiceAccessException, BadQueryException { - if (_store.getActiveTransactionContext() == null) { + if (_store.getCurrentlyActiveTransactionalResource() == null) { Connection connection = null; try { connection = _store.getNewConnection(); 1.6 +95 -250 jakarta-slide/src/stores/org/apache/slide/store/impl/rdbms/AbstractRDBMSStore.java Index: AbstractRDBMSStore.java =================================================================== RCS file: /home/cvs/jakarta-slide/src/stores/org/apache/slide/store/impl/rdbms/AbstractRDBMSStore.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- AbstractRDBMSStore.java 4 Jul 2004 18:57:35 -0000 1.5 +++ AbstractRDBMSStore.java 5 Jul 2004 10:55:21 -0000 1.6 @@ -34,6 +34,8 @@ import javax.transaction.xa.Xid; import org.apache.slide.common.AbstractXAService; +import org.apache.slide.common.AbstractXAServiceBase; +import org.apache.slide.common.Scope; import org.apache.slide.common.Service; import org.apache.slide.common.ServiceAccessException; import org.apache.slide.common.ServiceConnectionFailedException; @@ -67,6 +69,7 @@ import org.apache.slide.structure.ObjectAlreadyExistsException; import org.apache.slide.structure.ObjectNode; import org.apache.slide.structure.ObjectNotFoundException; +import org.apache.slide.transaction.TransactionalResource; import org.apache.slide.util.logger.Logger; /** @@ -80,7 +83,7 @@ * @version $Revision$ */ public abstract class AbstractRDBMSStore - extends AbstractXAService + extends AbstractXAServiceBase implements LockStore, NodeStore, @@ -91,15 +94,7 @@ SequenceStore, IBasicExpressionFactoryProvider { - protected String LOG_CHANNEL = this.getClass().getName(); - - protected static final int TX_IDLE = 0; - - protected static final int TX_PREPARED = 1; - - protected static final int TX_SUSPENDED = 2; - - protected ThreadLocal activeTransactionContext = new ThreadLocal(); + protected static final String LOG_CHANNEL = AbstractRDBMSStore.class.getName(); protected RDBMSAdapter adapter; @@ -167,7 +162,7 @@ * * @exception ServiceConnectionFailedException if the connection failed */ - public synchronized void connect() throws ServiceConnectionFailedException { + public void connect() throws ServiceConnectionFailedException { } /** @@ -191,10 +186,11 @@ * * @exception ServiceResetFailedException Reset failed */ - public synchronized void reset() throws ServiceResetFailedException { + public void reset() throws ServiceResetFailedException { } + // ----------------------------------------------------- XAResource Methods /** @@ -216,227 +212,18 @@ public Xid[] recover(int flag) throws XAException { getLogger().log("recover() for thread: " + Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG); - TransactionId id = getActiveTransactionContext(); + TransactionalResource id = getCurrentlyActiveTransactionalResource(); - if (id != null && id.status == TX_PREPARED) { + if (id != null && id.getStatus() == STATUS_PREPARED) { Xid[] xids = new Xid[1]; - xids[0] = id.xid; + xids[0] = id.getXid(); return xids; } else return new Xid[0]; } - public int prepare(Xid xid) throws XAException { - - getLogger().log("prepare() for thread: " + Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG); - TransactionId id = getActiveTransactionContext(); - - if (id == null) - throw new XAException(XAException.XAER_NOTA); - if (xid == null) - throw new XAException(XAException.XAER_INVAL); - - if (id.status != TX_IDLE && id.status != TX_SUSPENDED) - throw new XAException(XAException.XAER_PROTO); - - if (id.rollbackOnly) - throw new XAException(XAException.XA_RBROLLBACK); - - id.status = TX_PREPARED; - - return XAResource.XA_OK; - } - public boolean isSameRM(XAResource xares) throws XAException { - if (xares == null) - return false; - else - return this == xares; - } - - public void forget(Xid xid) throws XAException { - - getLogger().log("forget() for thread: " + Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG); - TransactionId id = getActiveTransactionContext(); - - if (id == null || id.xid == null) - throw new XAException(XAException.XAER_NOTA); - - if (xid == null) - throw new XAException(XAException.XAER_INVAL); - - try { - id.connection.close(); - } catch (SQLException e) { - getLogger().log("Couldn't close connection.", LOG_CHANNEL, Logger.ERROR); - } - getLogger().log("forget(): removing from map: " + Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG); - activeTransactionContext.set(null); - } - - public void end(Xid xid, int flags) throws XAException { - - getLogger().log("end() for thread: " + Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG); - TransactionId id = getActiveTransactionContext(); - if (id == null || id.xid == null) - throw new XAException(XAException.XAER_NOTA); - if (xid == null) - throw new XAException(XAException.XAER_INVAL); - - if (flags == XAResource.TMSUSPEND) - id.status = TX_SUSPENDED; - - if (flags == XAResource.TMFAIL) - id.rollbackOnly = true; - - } - - /** - * Commit the global transaction specified by xid. - */ - public void commit(Xid xid, boolean onePhase) throws XAException { - - getLogger().log( - "commit() for thread " + Thread.currentThread() + ", removing from map", - LOG_CHANNEL, - Logger.DEBUG); - - TransactionId id = getActiveTransactionContext(); - if (id == null) { - getLogger().log( - "Error committing: no transaction associated with current thread", - LOG_CHANNEL, - Logger.ERROR); - throw new XAException(XAException.XAER_NOTA); - } - if (xid == null) - throw new XAException(XAException.XAER_INVAL); - - if (!onePhase && id.status != TX_PREPARED) - throw new XAException(XAException.XAER_PROTO); - if (onePhase && (!(id.status == TX_IDLE || id.status == TX_SUSPENDED))) - throw new XAException(XAException.XAER_PROTO); - - Connection conn = id.connection; - - if (conn == null) { - getLogger().log( - "commit(): No connection in connectionMap for id \"" + id + "\"", - LOG_CHANNEL, - Logger.ERROR); - throw new XAException(XAException.XAER_NOTA); - } - - try { - if (!tmCommits) { - if (id.rollbackOnly) { - - conn.rollback(); - } else { - conn.commit(); - } - } - activeTransactionContext.set(null); - } catch (Exception e) { - throw new XAException(XAException.XA_RBCOMMFAIL); - } finally { - try { - conn.close(); - /* We must always return connections to the pool, - or we'd eventually run out. */ - } catch (SQLException e) { - getLogger().log(e, LOG_CHANNEL, Logger.ERROR); - } - } - } - - /** - * Inform the resource manager to roll back work done on behalf of a - * transaction branch. - */ - public void rollback(Xid xid) throws XAException { - - getLogger().log( - "rollback() for thread " + Thread.currentThread() + ", removing from map", - LOG_CHANNEL, - Logger.DEBUG); - - TransactionId id = getActiveTransactionContext(); - if (id == null) { - getLogger().log("No transaction associated with current thread, can't rollback", LOG_CHANNEL, Logger.ERROR); - throw new XAException(XAException.XAER_NOTA); - } - - Connection conn = id.connection; - if (conn == null) { - getLogger().log( - "rollback(): No connection in connectionMap for id \"" + id + "\"", - LOG_CHANNEL, - Logger.ERROR); - throw new XAException(XAException.XAER_NOTA); - } - - try { - if (!tmCommits) { - conn.rollback(); - } - activeTransactionContext.set(null); - - } catch (Exception e) { - throw new XAException(XAException.XA_HEURCOM); - } finally { - try { - conn.close(); - /* We must always return connections to the pool, - or we'd eventually run out. */ - } catch (SQLException e) { - getLogger().log(e, LOG_CHANNEL, Logger.ERROR); - } - } - } - - /** - * Start work on behalf of a transaction branch specified in xid. - */ - public void start(Xid xid, int flags) throws XAException { - getLogger().log("start(): beginning transaction with xid " + xid, LOG_CHANNEL, Logger.DEBUG); - - TransactionId id = getActiveTransactionContext(); - - switch (flags) { - case XAResource.TMNOFLAGS : - if (id != null) - throw new XAException(XAException.XAER_INVAL); - try { - id = new TransactionId(xid, TX_IDLE); - } catch (SQLException e) { - throw new XAException(XAException.XAER_RMFAIL); // XXX or is it an error? - } - - getLogger().log("start(): adding to map for " + Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG); - - activeTransactionContext.set(id); - break; - case XAResource.TMJOIN : - getLogger().log( - "TMJOIN for transaction in thread: " + Thread.currentThread(), - LOG_CHANNEL, - Logger.DEBUG); - if (id == null) - throw new XAException(XAException.XAER_NOTA); - break; - case XAResource.TMRESUME : - getLogger().log( - "TMRESUME for transaction in thread: " + Thread.currentThread(), - LOG_CHANNEL, - Logger.DEBUG); - if (id == null) - throw new XAException(XAException.XAER_NOTA); - if (id.status != TX_SUSPENDED) - throw new XAException(XAException.XAER_INVAL); - id.status = TX_IDLE; - break; - } + return (xares == this); } // ----------------------------------------------- IBasicExpressionFactoryProvider Implementation @@ -492,7 +279,7 @@ // not change will running, it is either yes or no all the time if (isSequenceSupported > 0) return true; - if (isSequenceSupported > 0) + if (isSequenceSupported < 0) return false; Connection connection = null; try { @@ -590,7 +377,7 @@ */ public ObjectNode retrieveObject(Uri uri) throws ServiceAccessException, ObjectNotFoundException { - if (getActiveTransactionContext() == null) { + if (getCurrentlyActiveTransactionalResource() == null) { Connection connection = null; try { connection = getNewConnection(); @@ -691,7 +478,7 @@ * @exception ServiceAccessException Error accessing the Service */ public Enumeration enumeratePermissions(Uri uri) throws ServiceAccessException { - if (getActiveTransactionContext() == null) { + if (getCurrentlyActiveTransactionalResource() == null) { Connection connection = null; try { connection = getNewConnection(); @@ -769,7 +556,7 @@ * @exception ServiceAccessException Service access error */ public Enumeration enumerateLocks(Uri uri) throws ServiceAccessException { - if (getActiveTransactionContext() == null) { + if (getCurrentlyActiveTransactionalResource() == null) { Connection connection = null; try { connection = getNewConnection(); @@ -802,7 +589,7 @@ */ public NodeRevisionDescriptors retrieveRevisionDescriptors(Uri uri) throws ServiceAccessException, RevisionDescriptorNotFoundException { - if (getActiveTransactionContext() == null) { + if (getCurrentlyActiveTransactionalResource() == null) { Connection connection = null; try { connection = getNewConnection(); @@ -872,7 +659,7 @@ */ public NodeRevisionDescriptor retrieveRevisionDescriptor(Uri uri, NodeRevisionNumber revisionNumber) throws ServiceAccessException, RevisionDescriptorNotFoundException { - if (getActiveTransactionContext() == null) { + if (getCurrentlyActiveTransactionalResource() == null) { Connection connection = null; try { connection = getNewConnection(); @@ -943,7 +730,7 @@ */ public NodeRevisionContent retrieveRevisionContent(Uri uri, NodeRevisionDescriptor revisionDescriptor) throws ServiceAccessException, RevisionNotFoundException { - if (getActiveTransactionContext() == null) { + if (getCurrentlyActiveTransactionalResource() == null) { Connection connection = null; try { connection = getNewConnection(); @@ -1013,13 +800,18 @@ // ------------------------------------------------------ Protected Methods + // XXX just for visibility in RDBMSComparableResourcesPool + protected TransactionalResource getCurrentlyActiveTransactionalResource() { + return super.getCurrentlyActiveTransactionalResource(); + } + /** * Get the Connection object associated with the current transaction. */ protected Connection getCurrentConnection() throws ServiceAccessException { getLogger().log("Getting current connection for thread " + Thread.currentThread(), LOG_CHANNEL, Logger.DEBUG); - TransactionId id = getActiveTransactionContext(); + TransactionId id = (TransactionId) getCurrentlyActiveTransactionalResource(); if (id == null) { getLogger().log("No id for current thread - called outside transaction?", LOG_CHANNEL, Logger.DEBUG); return null; @@ -1027,28 +819,81 @@ return id.connection; } - protected TransactionId getActiveTransactionContext() { - Object txId = activeTransactionContext.get(); - return (TransactionId) txId; - } - abstract protected Connection getNewConnection() throws SQLException; - // ---------------------------------------------------------- Inner Classes - - private class TransactionId { + protected TransactionalResource createTransactionResource(Xid xid) throws SQLException { + return new TransactionId(xid); + } + + private class TransactionId implements TransactionalResource { Xid xid; int status; - boolean rollbackOnly; Connection connection; - TransactionId(Xid xid, int status) throws SQLException { + TransactionId(Xid xid) throws SQLException { this.xid = xid; - this.status = status; - this.rollbackOnly = false; - + + status = STATUS_ACTIVE; connection = getNewConnection(); } + + public void commit() throws XAException { + try { + if (!tmCommits) { + connection.commit(); + } + } catch (SQLException e) { + throw new XAException(XAException.XA_RBCOMMFAIL); + } finally { + try { + connection.close(); + } catch (SQLException e) { + getLogger().log(e, LOG_CHANNEL, Logger.WARNING); + } + } + } + + public void rollback() throws XAException { + try { + if (!tmCommits) { + connection.rollback(); + } + } catch (SQLException e) { + throw new XAException(XAException.XA_RBCOMMFAIL); + } finally { + try { + connection.close(); + } catch (SQLException e) { + getLogger().log(e, LOG_CHANNEL, Logger.WARNING); + } + } + } + + public int prepare() throws XAException { + // no check possible + return XA_OK; + } + + public void close() throws XAException { + try { + connection.close(); + } catch (SQLException e) { + getLogger().log(e, LOG_CHANNEL, Logger.WARNING); + // do not report, but gracefully ignore + } + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public Xid getXid() { + return xid; + } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]