Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9f7751422 -> 60a8b49b1


ARTEMIS-1570 Flush appendExecutor before take journal snapshot

When live start replication, it must make sure there is
no pending write in message & bindings journal, or we may
lost journal records during initial replication.

So we need flush append executor after acquire StorageManager's
write lock, before Journal's write lock.
Also we set a 10 seconds timeout when flush, the same as
Journal::flushExecutor. If we failed to flush in 10 seconds,
we abort replication, backup will try again later.

Use OrderedExecutorFactory::flushExecutor to flush executor


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c4bfb952
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c4bfb952
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c4bfb952

Branch: refs/heads/master
Commit: c4bfb9521fd322c7179d31d5b5f7acf3f25d32dd
Parents: 9f77514
Author: shoukun <shoukunh...@live.cn>
Authored: Wed Dec 27 10:23:33 2017 +0800
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Jan 18 13:16:27 2018 -0500

----------------------------------------------------------------------
 .../activemq/artemis/core/journal/impl/JournalImpl.java      | 4 ++++
 .../core/persistence/impl/journal/JournalStorageManager.java | 8 ++++++++
 2 files changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4bfb952/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 5f31a2b..77bf9da 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -2237,6 +2237,10 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
 
    }
 
+   public boolean flushAppendExecutor(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return OrderedExecutorFactory.flushExecutor(appendExecutor, timeout, 
unit);
+   }
+
    @Override
    public int getDataFilesCount() {
       return filesRepository.getDataFilesCount();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4bfb952/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 87f4fc9..c54a3b7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -568,6 +568,14 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
                throw new ActiveMQIllegalStateException("already replicating");
             replicator = replicationManager;
 
+            if (!((JournalImpl) 
originalMessageJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) {
+               throw new Exception("Live message journal is busy");
+            }
+
+            if (!((JournalImpl) 
originalBindingsJournal).flushAppendExecutor(10, TimeUnit.SECONDS)) {
+               throw new Exception("Live bindings journal is busy");
+            }
+
             // Establishes lock
             originalMessageJournal.synchronizationLock();
             originalBindingsJournal.synchronizationLock();

Reply via email to