Repository: activemq-artemis Updated Branches: refs/heads/master 9f7751422 -> 60a8b49b1
ARTEMIS-1570 Flush appendExecutor before take journal snapshot When live start replication, it must make sure there is no pending write in message & bindings journal, or we may lost journal records during initial replication. So we need flush append executor after acquire StorageManager's write lock, before Journal's write lock. Also we set a 10 seconds timeout when flush, the same as Journal::flushExecutor. If we failed to flush in 10 seconds, we abort replication, backup will try again later. Use OrderedExecutorFactory::flushExecutor to flush executor Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c4bfb952 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c4bfb952 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c4bfb952 Branch: refs/heads/master Commit: c4bfb9521fd322c7179d31d5b5f7acf3f25d32dd Parents: 9f77514 Author: shoukun <shoukunh...@live.cn> Authored: Wed Dec 27 10:23:33 2017 +0800 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Jan 18 13:16:27 2018 -0500 ---------------------------------------------------------------------- .../activemq/artemis/core/journal/impl/JournalImpl.java | 4 ++++ .../core/persistence/impl/journal/JournalStorageManager.java | 8 ++++++++ 2 files changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4bfb952/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 5f31a2b..77bf9da 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2237,6 +2237,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } + public boolean flushAppendExecutor(long timeout, TimeUnit unit) throws InterruptedException { + return OrderedExecutorFactory.flushExecutor(appendExecutor, timeout, unit); + } + @Override public int getDataFilesCount() { return filesRepository.getDataFilesCount(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4bfb952/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 87f4fc9..c54a3b7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -568,6 +568,14 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw new ActiveMQIllegalStateException("already replicating"); replicator = replicationManager; + if (!((JournalImpl) originalMessageJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live message journal is busy"); + } + + if (!((JournalImpl) originalBindingsJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) { + throw new Exception("Live bindings journal is busy"); + } + // Establishes lock originalMessageJournal.synchronizationLock(); originalBindingsJournal.synchronizationLock();