This is an automated email from the ASF dual-hosted git repository. andy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/jena.git
commit 098ceb3a706cfcfa4c73b5018081b8f2379593fa Author: Andy Seaborne <a...@apache.org> AuthorDate: Sun Mar 10 15:04:31 2024 +0000 GH-2318: Protect against shutdown removing the coordinatorLock. --- .../transaction/txn/TransactionCoordinator.java | 84 ++++++++++++++++++---- .../TestTransactionCoordinatorControl.java | 35 ++++++++- 2 files changed, 103 insertions(+), 16 deletions(-) diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java index b823287796..21fb6b2169 100644 --- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java +++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java @@ -369,19 +369,65 @@ public class TransactionCoordinator implements TransactionalSystemControl { }); } + /** + * Shutdown the coordinator; this operation does not check the state of the transaction system. + * <p> + * It expects the caller to decide when to shutdown; maybe waiting for all transactions + * to finish or maybe forcing the system to shutdown for some reason, causing later + * transaction exceptions. + * <p> + * Use inside exclusive mode to be sure no active transaction are present. + */ public void shutdown() { shutdown(false); } + /** + * Shutdown the coordinator; this operation does not check the state of the transaction system. + * <p> + * It expects the caller to decide when to shutdown; maybe waiting for all transactions + * to finish or maybe forcing the system to shutdown for some reason, causing later + * transaction exceptions. + * <p> + * Use inside exclusive mode to be sure no active transaction are present. + */ public void shutdown(boolean silent) { - if ( coordinatorLock == null ) + // See also getCoordinatorLock() + var coordLock = coordinatorLock; + if ( coordLock == null ) return; - if ( ! silent && countActive() > 0 ) - FmtLog.warn(SysErr, "Transactions active: W=%d, R=%d", countActiveWriter(), countActiveReaders()); - components.forEach((id, c) -> c.shutdown()); - shutdownHooks.forEach((h)-> h.shutdown()); - coordinatorLock = null; - journal.close(); + synchronized(coordLock) { + // Check again + if ( coordinatorLock == null ) + return; + if ( ! silent && countActive() > 0 ) + FmtLog.warn(SysErr, "Transactions active: W=%d, R=%d", countActiveWriter(), countActiveReaders()); + components.forEach((id, c) -> c.shutdown()); + shutdownHooks.forEach((h)-> h.shutdown()); + coordinatorLock = null; + journal.close(); + } + } + + /** + * Get the coordinator lock in a safe way or throw a {@link TransactionException} + * <p> + * Another thread may call {@link #shutdown(boolean)} at any time. Using the + * coordinatorLock for a synchronized block may cause a + * {@code NullPointerException}. + * <p> + * Code may still need to call + * {@link #checkActive()} inside the synchronized block. + */ + private Object getCoordinatorLock() { + // Read once and the return. + var coordLock = coordinatorLock; + // The coordinatorLock can only go non-null to null. + // An operation using the coordinatorLock with synchronized{} needs a non-null value. + // If "coordLock" is null, then coordinatorLock was null and checkActive() will fail. + checkActive(); + // Non-null - synchronized{} will not NPE. + return coordLock; } @Override @@ -632,7 +678,10 @@ public class TransactionCoordinator implements TransactionalSystemControl { } private Transaction begin$(TxnType txnType) { - synchronized(coordinatorLock) { + // Read once. + var coordLock = getCoordinatorLock(); + // Async shutdown may have happened. + synchronized(coordLock) { // Inside the lock - check again. checkActive(); // Thread safe part of 'begin' @@ -703,11 +752,14 @@ public class TransactionCoordinator implements TransactionalSystemControl { private boolean promoteTxn$(Transaction transaction, boolean readCommittedPromotion) { // == Read committed path. + // Read once. + var coordLock = getCoordinatorLock(); if ( transaction.getTxnType() == TxnType.READ_COMMITTED_PROMOTE ) { if ( ! promotionWaitForWriters() ) return false; // Now single writer. - synchronized(coordinatorLock) { + synchronized(coordLock) { + checkActive(); try { transaction.promoteComponents(); // Because we want to see the new state of the data. @@ -736,7 +788,8 @@ public class TransactionCoordinator implements TransactionalSystemControl { return false; // Now a proto-writer. We need to confirm when inside the synchronized. - synchronized(coordinatorLock) { + synchronized(coordLock) { + checkActive(); // Not read committed. // Need to check the data version once we are the writer and all previous // writers have committed or aborted. @@ -826,7 +879,8 @@ public class TransactionCoordinator implements TransactionalSystemControl { } private void executeCommitWriter(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) { - synchronized(coordinatorLock) { + var coordLock = getCoordinatorLock(); + synchronized(coordLock) { try { // *** COMMIT POINT journal.writeJournal(JournalEntry.COMMIT); @@ -937,8 +991,8 @@ public class TransactionCoordinator implements TransactionalSystemControl { private AtomicLong activeWritersCount = new AtomicLong(0); private void startActiveTransaction(Transaction transaction) { - synchronized(coordinatorLock) { - // Use lock to ensure all the counters move together. + var coordLock = getCoordinatorLock(); + synchronized(coordLock) { // Use lock to ensure all the counters move together. // Thread safe - we have not let the Transaction object out yet. countBegin.incrementAndGet(); switch(transaction.getMode()) { @@ -957,8 +1011,8 @@ public class TransactionCoordinator implements TransactionalSystemControl { } private void finishActiveTransaction(Transaction transaction) { - synchronized(coordinatorLock) { - // Idempotent. + var coordLock = getCoordinatorLock(); + synchronized(coordLock) { // Idempotent. boolean x = activeTransactions.remove(transaction); if ( ! x ) return; diff --git a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java index ad2a8b8f63..3fb4a747c5 100644 --- a/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java +++ b/jena-db/jena-dboe-transaction/src/test/java/org/apache/jena/dboe/transaction/TestTransactionCoordinatorControl.java @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.jena.atlas.lib.ThreadLib; import org.apache.jena.dboe.base.file.Location; -import org.apache.jena.system.Txn; import org.apache.jena.dboe.transaction.txn.Transaction; import org.apache.jena.dboe.transaction.txn.TransactionCoordinator; import org.apache.jena.dboe.transaction.txn.TransactionException; @@ -33,6 +32,7 @@ import org.apache.jena.dboe.transaction.txn.TransactionalBase; import org.apache.jena.query.TxnType; import org.apache.jena.system.ThreadAction; import org.apache.jena.system.ThreadTxn; +import org.apache.jena.system.Txn; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -153,5 +153,38 @@ public class TestTransactionCoordinatorControl { b = txnMgr.tryExclusiveMode(false); assertTrue(b); } + + public void txn_coord_shutdown_1() { + txnMgr.shutdown(); + txnMgr.shutdown(); + // And again in after(). + } + + @Test(expected=TransactionException.class) + public void txn_coord_shutdown_2() { + Transaction txn = txnMgr.begin(TxnType.READ); + txnMgr.shutdown(true); + txn.commit(); + } + + @Test(expected=TransactionException.class) + public void txn_coord_shutdown_3() { + Transaction txn = txnMgr.begin(TxnType.WRITE); + txnMgr.shutdown(true); + txn.commit(); + } + + @Test(expected=TransactionException.class) + public void txn_coord_shutdown_4() { + txnMgr.shutdown(true); + txnMgr.begin(TxnType.READ); + } + + + @Test(expected=TransactionException.class) + public void txn_coord_shutdown_5() { + txnMgr.shutdown(true); + txnMgr.begin(TxnType.READ); + } }