Repository: activemq-artemis Updated Branches: refs/heads/master ee6176c67 -> 50d83fb63
ARTEMIS-552 Replication target being finished can lead to instability on live https://issues.apache.org/jira/browse/ARTEMIS-552 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2e658654 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2e658654 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2e658654 Branch: refs/heads/master Commit: 2e6586548b3a0d69f1ce2079a2da243af16a28c6 Parents: ee6176c Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Wed Jun 1 15:34:57 2016 -0400 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Mon Jun 6 16:28:51 2016 -0400 ---------------------------------------------------------------------- .../core/persistence/OperationContext.java | 12 +++ .../core/persistence/StorageManager.java | 2 + .../journal/AbstractJournalStorageManager.java | 9 +++ .../impl/journal/DummyOperationContext.java | 7 ++ .../impl/journal/JournalStorageManager.java | 4 + .../impl/journal/OperationContextImpl.java | 46 ++++++++++-- .../impl/nullpm/NullStorageManager.java | 10 +++ .../postoffice/impl/DuplicateIDCacheImpl.java | 2 +- .../server/impl/RemotingServiceImpl.java | 12 ++- .../core/replication/ReplicationManager.java | 72 +++++++++++++----- .../core/server/ActiveMQServerLogger.java | 5 ++ .../artemis/core/transaction/Transaction.java | 5 ++ .../core/transaction/impl/TransactionImpl.java | 78 ++++++++++++++++++-- .../transaction/impl/TransactionImplTest.java | 5 ++ .../extras/byteman/OrphanedConsumerTest.java | 4 - .../tests/integration/client/PagingTest.java | 5 ++ .../persistence/DuplicateCacheTest.java | 2 +- .../core/postoffice/impl/BindingsImplTest.java | 5 ++ 18 files changed, 246 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java index 6d64eb8..e893a10 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java @@ -29,9 +29,21 @@ public interface OperationContext extends IOCompletion { /** * Execute the task when all IO operations are complete, * Or execute it immediately if nothing is pending. + * @param runnable the tas to be executed. + * @param storeOnly There are tasks that won't need to wait on replication or paging and will need to + * be completed as soon as the response from the journal is received. An example would be the + * DuplicateCache + */ + void executeOnCompletion(IOCallback runnable, boolean storeOnly); + + /** + * Execute the task when all IO operations are complete, + * Or execute it immediately if nothing is pending. + * @param runnable the tas to be executed. */ void executeOnCompletion(IOCallback runnable); + void replicationLineUp(); void replicationDone(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index a0a5200..f92d0d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -100,6 +100,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { void afterCompleteOperations(IOCallback run); + /** This is similar to afterComplete, however this only cares about the journal part. */ + void afterStoreOperations(IOCallback run); /** * Block until the operations are done. * Warning: Don't use it inside an ordered executor, otherwise the system may lock up http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index ff21fe2..ed2e1f4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -292,6 +292,10 @@ public abstract class AbstractJournalStorageManager implements StorageManager { getContext().executeOnCompletion(run); } + public void afterStoreOperations(IOCallback run) { + getContext().executeOnCompletion(run, true); + } + @Override public long generateID() { return idGenerator.generateID(); @@ -1789,6 +1793,11 @@ public abstract class AbstractJournalStorageManager implements StorageManager { } @Override + public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + executeOnCompletion(runnable); + } + + @Override public void replicationDone() { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java index 1ae7524..6fd95ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java @@ -35,6 +35,13 @@ final class DummyOperationContext implements OperationContext { } @Override + public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + // There are no executeOnCompletion calls while using the DummyOperationContext + // However we keep the code here for correctness + runnable.done(); + } + + @Override public void replicationDone() { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 1379308..157306e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -61,8 +61,10 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.jboss.logging.Logger; public class JournalStorageManager extends AbstractJournalStorageManager { + private static final Logger logger = Logger.getLogger(JournalStorageManager.class); private SequentialFileFactory journalFF; @@ -569,6 +571,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } } catch (Exception e) { + logger.warn(e.getMessage(), e); stopReplication(); throw e; } @@ -681,6 +684,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { */ @Override public void stopReplication() { + logger.trace("stopReplication()"); storageManagerLock.writeLock().lock(); try { if (replicator == null) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java index acd75b1..06e07f7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java @@ -72,6 +72,7 @@ public class OperationContextImpl implements OperationContext { } private List<TaskHolder> tasks; + private List<TaskHolder> storeOnlyTasks; private long minimalStore = Long.MAX_VALUE; private long minimalReplicated = Long.MAX_VALUE; @@ -126,7 +127,12 @@ public class OperationContextImpl implements OperationContext { } @Override - public void executeOnCompletion(final IOCallback completion) { + public void executeOnCompletion(IOCallback runnable) { + executeOnCompletion(runnable, false); + } + + @Override + public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) { if (errorCode != -1) { completion.onError(errorCode, errorMessage); return; @@ -135,11 +141,18 @@ public class OperationContextImpl implements OperationContext { boolean executeNow = false; synchronized (this) { - if (tasks == null) { - tasks = new LinkedList<>(); - minimalReplicated = replicationLineUp.intValue(); - minimalStore = storeLineUp.intValue(); - minimalPage = pageLineUp.intValue(); + if (storeOnly) { + if (storeOnlyTasks == null) { + storeOnlyTasks = new LinkedList<>(); + } + } + else { + if (tasks == null) { + tasks = new LinkedList<>(); + minimalReplicated = replicationLineUp.intValue(); + minimalStore = storeLineUp.intValue(); + minimalPage = pageLineUp.intValue(); + } } // On this case, we can just execute the context directly @@ -159,7 +172,12 @@ public class OperationContextImpl implements OperationContext { } } else { - tasks.add(new TaskHolder(completion)); + if (storeOnly) { + storeOnlyTasks.add(new TaskHolder(completion)); + } + else { + tasks.add(new TaskHolder(completion)); + } } } @@ -177,6 +195,20 @@ public class OperationContextImpl implements OperationContext { } private void checkTasks() { + + if (storeOnlyTasks != null) { + Iterator<TaskHolder> iter = storeOnlyTasks.iterator(); + while (iter.hasNext()) { + TaskHolder holder = iter.next(); + if (stored >= holder.storeLined) { + // If set, we use an executor to avoid the server being single threaded + execute(holder.task); + + iter.remove(); + } + } + } + if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage) { Iterator<TaskHolder> iter = tasks.iterator(); while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 39c5de5..21a9fd9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -94,6 +94,11 @@ public class NullStorageManager implements StorageManager { } @Override + public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + runnable.done(); + } + + @Override public void storeLineUp() { } @@ -339,6 +344,11 @@ public class NullStorageManager implements StorageManager { } @Override + public void afterStoreOperations(IOCallback run) { + run.done(); + } + + @Override public void waitOnOperations() throws Exception { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index 7f35638..28896c3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -226,7 +226,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { } // For a tx, it's important that the entry is not added to the cache until commit // since if the client fails then resends them tx we don't want it to get rejected - tx.addOperation(new AddDuplicateIDOperation(duplID, recordID)); + tx.afterStore(new AddDuplicateIDOperation(duplID, recordID)); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 3672fe2..3a073e9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -690,9 +690,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif } for (Object id : idsToRemove) { - RemotingConnection conn = getConnection(id); + final RemotingConnection conn = getConnection(id); if (conn != null) { - conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress())); + // In certain cases (replicationManager for instance) calling fail could take some time + // We can't pause the FailureCheckAndFlushThread as that would lead other clients to fail for + // missing pings + flushExecutor.execute(new Runnable() { + @Override + public void run() { + conn.fail(ActiveMQMessageBundle.BUNDLE.clientExited(conn.getRemoteAddress())); + } + }); removeConnection(id); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 1abd9c6..58102d4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -265,13 +265,14 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene } @Override - public synchronized void stop() throws Exception { - if (!started) { - return; + public void stop() throws Exception { + synchronized (this) { + if (!started) { + logger.trace("Stopping being ignored as it hasn't been started"); + return; + } } - enabled = false; - // This is to avoid the write holding a lock while we are trying to close it if (replicatingChannel != null) { replicatingChannel.close(); @@ -279,6 +280,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene } synchronized (replicationLock) { + enabled = false; writable.set(true); replicationLock.notifyAll(); clearReplicationTokens(); @@ -299,9 +301,12 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene * backup crashing). */ public void clearReplicationTokens() { + logger.trace("clearReplicationTokens initiating"); synchronized (replicationLock) { + logger.trace("clearReplicationTokens entered the lock"); while (!pendingTokens.isEmpty()) { OperationContext ctx = pendingTokens.poll(); + logger.trace("Calling ctx.replicationDone()"); try { ctx.replicationDone(); } @@ -310,6 +315,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene } } } + logger.trace("clearReplicationTokens finished"); } /** @@ -347,20 +353,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene synchronized (replicationLock) { if (enabled) { pendingTokens.add(repliToken); - if (!replicatingChannel.getConnection().isWritable(this)) { - try { - writable.set(false); - //don't wait for ever as this may hang tests etc, we've probably been closed anyway - long now = System.currentTimeMillis(); - long deadline = now + 5000; - while (!writable.get() && now < deadline) { - replicationLock.wait(deadline - now); - now = System.currentTimeMillis(); - } - } - catch (InterruptedException e) { - throw new ActiveMQInterruptedException(e); - } + if (!flowControl()) { + return repliToken; } replicatingChannel.send(packet); } @@ -379,6 +373,43 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene return repliToken; } + /** This was written as a refactoring of sendReplicatePacket. + * In case you refactor this in any way, this method must hold a lock on replication lock. .*/ + private boolean flowControl() { + // synchronized (replicationLock) { -- I'm not adding this because the caller already has it + // future maintainers of this code please be aware that the intention here is hold the lock on replication lock + if (!replicatingChannel.getConnection().isWritable(this)) { + try { + logger.trace("flowControl waiting on writable"); + writable.set(false); + //don't wait for ever as this may hang tests etc, we've probably been closed anyway + long now = System.currentTimeMillis(); + long deadline = now + 5000; + while (!writable.get() && now < deadline) { + replicationLock.wait(deadline - now); + now = System.currentTimeMillis(); + } + logger.trace("flow control done"); + + if (!writable.get()) { + ActiveMQServerLogger.LOGGER.slowReplicationResponse(); + logger.tracef("There was no response from replication backup after %s seconds, server being stopped now", System.currentTimeMillis() - now); + try { + stop(); + } + catch (Exception e) { + logger.warn(e.getMessage(), e); + } + return false; + } + } + catch (InterruptedException e) { + throw new ActiveMQInterruptedException(e); + } + } + return true; + } + @Override public void readyForWriting() { synchronized (replicationLock) { @@ -591,6 +622,7 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene sendReplicatePacket(new ReplicationStartSyncMessage(nodeID)); try { if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) { + logger.trace("sendSynchronizationDone wasn't finished in time"); throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); } } @@ -598,6 +630,8 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene logger.debug(e); } inSync = false; + + logger.trace("sendSynchronizationDone finished"); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 6679008..3428a2f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1209,6 +1209,11 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 222206, value = "Connection limit of {0} reached. Refusing connection from {1}.", format = Message.Format.MESSAGE_FORMAT) void connectionLimitReached(long connectionsAllowed, String address); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222207, value = "The backup server is not responding promptly introducing latency beyond the limit. Replication server being disconnected now.", + format = Message.Format.MESSAGE_FORMAT) + void slowReplicationResponse(); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java index da87cbf..33c1eea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java @@ -67,6 +67,11 @@ public interface Transaction { void addOperation(TransactionOperation sync); + /** This is an operation that will be called right after the storage is completed. + * addOperation could only happen after paging and replication, while these operations will just be + * about the storage*/ + void afterStore(TransactionOperation sync); + List<TransactionOperation> getAllOperations(); boolean hasTimedOut(long currentTime, int defaultTimeout); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java index 0a91562..185bfb2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.transaction.impl; import javax.transaction.xa.Xid; import java.util.ArrayList; import java.util.Date; +import java.util.LinkedList; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -38,6 +39,8 @@ public class TransactionImpl implements Transaction { private List<TransactionOperation> operations; + private List<TransactionOperation> storeOperations; + private static final int INITIAL_NUM_PROPERTIES = 10; private Object[] properties = new Object[TransactionImpl.INITIAL_NUM_PROPERTIES]; @@ -301,6 +304,24 @@ public class TransactionImpl implements Transaction { } }); + final List<TransactionOperation> storeOperationsToComplete = this.storeOperations; + this.storeOperations = null; + + if (storeOperationsToComplete != null) { + storageManager.afterStoreOperations(new IOCallback() { + + @Override + public void onError(final int errorCode, final String errorMessage) { + ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); + } + + @Override + public void done() { + afterCommit(storeOperationsToComplete); + } + }); + } + } } @@ -365,6 +386,9 @@ public class TransactionImpl implements Transaction { final List<TransactionOperation> operationsToComplete = this.operations; this.operations = null; + final List<TransactionOperation> storeOperationsToComplete = this.storeOperations; + this.storeOperations = null; + // We use the Callback even for non persistence // If we are using non-persistence with replication, the replication manager will have // to execute this runnable in the correct order @@ -380,6 +404,21 @@ public class TransactionImpl implements Transaction { afterRollback(operationsToComplete); } }); + + if (storeOperationsToComplete != null) { + storageManager.afterStoreOperations(new IOCallback() { + + @Override + public void onError(final int errorCode, final String errorMessage) { + ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage); + } + + @Override + public void done() { + afterRollback(storeOperationsToComplete); + } + }); + } } @Override @@ -445,6 +484,15 @@ public class TransactionImpl implements Transaction { operations.add(operation); } + + @Override + public synchronized void afterStore(TransactionOperation sync) { + if (storeOperations == null) { + storeOperations = new LinkedList<>(); + } + storeOperations.add(sync); + } + private int getOperationsCount() { checkCreateOperations(); @@ -491,7 +539,7 @@ public class TransactionImpl implements Transaction { private void checkCreateOperations() { if (operations == null) { - operations = new ArrayList<>(); + operations = new LinkedList<>(); } } @@ -505,13 +553,13 @@ public class TransactionImpl implements Transaction { } } - private synchronized void afterRollback(List<TransactionOperation> oeprationsToComplete) { - if (oeprationsToComplete != null) { - for (TransactionOperation operation : oeprationsToComplete) { + private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) { + if (operationsToComplete != null) { + for (TransactionOperation operation : operationsToComplete) { operation.afterRollback(this); } // Help out GC here - oeprationsToComplete.clear(); + operationsToComplete.clear(); } } @@ -521,6 +569,11 @@ public class TransactionImpl implements Transaction { operation.beforeCommit(this); } } + if (storeOperations != null) { + for (TransactionOperation operation : storeOperations) { + operation.beforeCommit(this); + } + } } private synchronized void beforePrepare() throws Exception { @@ -529,6 +582,11 @@ public class TransactionImpl implements Transaction { operation.beforePrepare(this); } } + if (storeOperations != null) { + for (TransactionOperation operation : storeOperations) { + operation.beforePrepare(this); + } + } } private synchronized void beforeRollback() throws Exception { @@ -537,6 +595,11 @@ public class TransactionImpl implements Transaction { operation.beforeRollback(this); } } + if (storeOperations != null) { + for (TransactionOperation operation : storeOperations) { + operation.beforeRollback(this); + } + } } private synchronized void afterPrepare() { @@ -545,6 +608,11 @@ public class TransactionImpl implements Transaction { operation.afterPrepare(this); } } + if (storeOperations != null) { + for (TransactionOperation operation : storeOperations) { + operation.afterPrepare(this); + } + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index 6c5cfe5..9a66610 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -253,6 +253,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override + public void afterStoreOperations(IOCallback run) { + run.done(); + } + + @Override public boolean waitOnOperations(long timeout) throws Exception { return false; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java index 211aee5..a95cbaa 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/OrphanedConsumerTest.java @@ -67,10 +67,6 @@ public class OrphanedConsumerTest extends ActiveMQTestBase { * This static method is an entry point for the byteman rules on {@link #testOrphanedConsumers()} */ public static void leavingCloseOnTestCountersWhileClosing() { - if (staticServer.getConnectionCount() == 0) { - verification = new AssertionError("The connection was closed before the consumers and sessions, this may cause issues on management leaving Orphaned Consumers!"); - } - if (staticServer.getSessions().size() == 0) { verification = new AssertionError("The session was closed before the consumers, this may cause issues on management leaving Orphaned Consumers!"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java index f658fae..41467b5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java @@ -5730,7 +5730,12 @@ public class PagingTest extends ActiveMQTestBase { @Override public void executeOnCompletion(IOCallback runnable) { + runnable.done(); + } + @Override + public void executeOnCompletion(IOCallback runnable, boolean storeOnly) { + runnable.done(); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java index 38be202..299022c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/DuplicateCacheTest.java @@ -90,7 +90,7 @@ public class DuplicateCacheTest extends StorageManagerTestBase { public void onError(int errorCode, String errorMessage) { } - }); + }, true); Assert.assertTrue(latch.await(1, TimeUnit.MINUTES)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e658654/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index 805a6f5..44b5d82 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -115,6 +115,11 @@ public class BindingsImplTest extends ActiveMQTestBase { } @Override + public void afterStore(TransactionOperation sync) { + + } + + @Override public void addOperation(final TransactionOperation sync) { }