http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64f74acd/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java new file mode 100644 index 0000000..be457c4 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -0,0 +1,1828 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.persistence.impl.journal; + +import javax.transaction.xa.Xid; +import java.io.File; +import java.io.FileInputStream; +import java.security.AccessController; +import java.security.DigestInputStream; +import java.security.InvalidParameterException; +import java.security.MessageDigest; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.journal.JournalLoadInformation; +import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; +import org.apache.activemq.artemis.core.journal.RecordInfo; +import org.apache.activemq.artemis.core.paging.PageTransactionInfo; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.paging.cursor.PagePosition; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; +import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl; +import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl; +import org.apache.activemq.artemis.core.persistence.GroupingInfo; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; +import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting; +import org.apache.activemq.artemis.core.persistence.config.PersistedRoles; +import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeleteEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.FinishPageMessageOperation; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.GroupingEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding; +import org.apache.activemq.artemis.core.persistence.impl.journal.codec.XidEncoding; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RouteContextList; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; +import org.apache.activemq.artemis.core.server.impl.JournalLoader; +import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.transaction.ResourceManager; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; +import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.Base64; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.IDGenerator; + +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME; + +/** + * Controls access to the journals and other storage files such as the ones used to store pages and + * large messages. This class must control writing of any non-transient data, as it is the key point + * for synchronizing any replicating backup server. + * <p> + * Using this class also ensures that locks are acquired in the right order, avoiding dead-locks. + */ +public abstract class AbstractJournalStorageManager implements StorageManager { + + public enum JournalContent { + BINDINGS((byte) 0), MESSAGES((byte) 1); + + public final byte typeByte; + + JournalContent(byte b) { + typeByte = b; + } + + public static JournalContent getType(byte type) { + if (MESSAGES.typeByte == type) + return MESSAGES; + if (BINDINGS.typeByte == type) + return BINDINGS; + throw new InvalidParameterException("invalid byte: " + type); + } + } + + private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE; + + protected Semaphore pageMaxConcurrentIO; + + protected BatchingIDGenerator idGenerator; + + protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true); + + protected Journal messageJournal; + + protected Journal bindingsJournal; + + protected volatile boolean started; + + /** + * Used to create Operation Contexts + */ + private final ExecutorFactory executorFactory; + + final Executor executor; + + ExecutorService singleThreadExecutor; + + private final boolean syncTransactional; + + private final boolean syncNonTransactional; + + protected int perfBlastPages = -1; + + protected boolean journalLoaded = false; + + private final IOCriticalErrorListener ioCriticalErrorListener; + + protected final Configuration config; + + // Persisted core configuration + protected final Map<SimpleString, PersistedRoles> mapPersistedRoles = new ConcurrentHashMap<>(); + + protected final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<>(); + + protected final Set<Long> largeMessagesToDelete = new HashSet<>(); + + public AbstractJournalStorageManager(final Configuration config, final ExecutorFactory executorFactory) { + this(config, executorFactory, null); + } + + public AbstractJournalStorageManager(Configuration config, + ExecutorFactory executorFactory, + IOCriticalErrorListener criticalErrorListener) { + this.executorFactory = executorFactory; + + this.ioCriticalErrorListener = criticalErrorListener; + + this.config = config; + + executor = executorFactory.getExecutor(); + + syncNonTransactional = config.isJournalSyncNonTransactional(); + syncTransactional = config.isJournalSyncTransactional(); + + init(config, criticalErrorListener); + + idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, this); + } + + /** + * Called during initialization. Used by implementations to setup Journals, Stores etc... + * @param config + * @param criticalErrorListener + */ + protected abstract void init(Configuration config, IOCriticalErrorListener criticalErrorListener); + + @Override + public void criticalError(Throwable error) { + ioCriticalErrorListener.onIOException(error, error.getMessage(), null); + } + + @Override + public void clearContext() { + OperationContextImpl.clearContext(); + } + + public static String md5(File file) { + try { + byte[] buffer = new byte[1 << 4]; + MessageDigest md = MessageDigest.getInstance("MD5"); + + FileInputStream is = new FileInputStream(file); + DigestInputStream is2 = new DigestInputStream(is, md); + while (is2.read(buffer) > 0) { + continue; + } + byte[] digest = md.digest(); + is.close(); + is2.close(); + return Base64.encodeBytes(digest); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public IDGenerator getIDGenerator() { + return idGenerator; + } + + @Override + public final void waitOnOperations() throws Exception { + if (!started) { + ActiveMQServerLogger.LOGGER.serverIsStopped(); + throw new IllegalStateException("Server is stopped"); + } + waitOnOperations(0); + } + + @Override + public final boolean waitOnOperations(final long timeout) throws Exception { + if (!started) { + ActiveMQServerLogger.LOGGER.serverIsStopped(); + throw new IllegalStateException("Server is stopped"); + } + return getContext().waitCompletion(timeout); + } + + public OperationContext getContext() { + return OperationContextImpl.getContext(executorFactory); + } + + public void setContext(final OperationContext context) { + OperationContextImpl.setContext(context); + } + + public Executor getSingleThreadExecutor() { + return singleThreadExecutor; + } + + public OperationContext newSingleThreadContext() { + return newContext(singleThreadExecutor); + } + + public OperationContext newContext(final Executor executor1) { + return new OperationContextImpl(executor1); + } + + public void afterCompleteOperations(final IOCallback run) { + getContext().executeOnCompletion(run); + } + + public long generateID() { + return idGenerator.generateID(); + } + + public long getCurrentID() { + return idGenerator.getCurrentID(); + } + + // Non transactional operations + + + public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long recordID) throws Exception { + readLock(); + try { + installLargeMessageConfirmationOnTX(tx, recordID); + messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new DeleteEncoding(JournalRecordIds.ADD_LARGE_MESSAGE_PENDING, messageID)); + } + finally { + readUnLock(); + } + } + + /** + * We don't need messageID now but we are likely to need it we ever decide to support a database + */ + public void confirmPendingLargeMessage(long recordID) throws Exception { + readLock(); + try { + messageJournal.appendDeleteRecord(recordID, true, getContext()); + } + finally { + readUnLock(); + } + } + + public void storeMessage(final ServerMessage message) throws Exception { + if (message.getMessageID() <= 0) { + // Sanity check only... this shouldn't happen unless there is a bug + throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); + } + + readLock(); + try { + // Note that we don't sync, the add reference that comes immediately after will sync if + // appropriate + + if (message.isLargeMessage()) { + messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding((LargeServerMessage) message), false, getContext(false)); + } + else { + messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message, false, getContext(false)); + } + } + finally { + readUnLock(); + } + } + + public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception { + readLock(); + try { + messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(last && syncNonTransactional)); + } + finally { + readUnLock(); + } + } + + @Override + public void readLock() { + storageManagerLock.readLock().lock(); + } + + @Override + public void readUnLock() { + storageManagerLock.readLock().unlock(); + } + + public void storeAcknowledge(final long queueID, final long messageID) throws Exception { + readLock(); + try { + messageJournal.appendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional)); + } + finally { + readUnLock(); + } + } + + public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception { + readLock(); + try { + long ackID = idGenerator.generateID(); + position.setRecordID(ackID); + messageJournal.appendAddRecord(ackID, JournalRecordIds.ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position), syncNonTransactional, getContext(syncNonTransactional)); + } + finally { + readUnLock(); + } + } + + public void deleteMessage(final long messageID) throws Exception { + readLock(); + try { + // Messages are deleted on postACK, one after another. + // If these deletes are synchronized, we would build up messages on the Executor + // increasing chances of losing deletes. + // The StorageManager should verify messages without references + messageJournal.appendDeleteRecord(messageID, false, getContext(false)); + } + finally { + readUnLock(); + } + } + + public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception { + ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); + readLock(); + try { + messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, getContext(syncNonTransactional)); + } + finally { + readUnLock(); + } + } + + public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception { + readLock(); + try { + DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID); + + messageJournal.appendAddRecord(recordID, JournalRecordIds.DUPLICATE_ID, encoding, syncNonTransactional, getContext(syncNonTransactional)); + } + finally { + readUnLock(); + } + } + + public void deleteDuplicateID(final long recordID) throws Exception { + readLock(); + try { + messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional)); + } + finally { + readUnLock(); + } + } + + // Transactional operations + + public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception { + if (message.getMessageID() <= 0) { + throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned(); + } + + readLock(); + try { + if (message.isLargeMessage()) { + messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, new LargeMessageEncoding(((LargeServerMessage) message))); + } + else { + messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), JournalRecordIds.ADD_MESSAGE, message); + } + + } + finally { + readUnLock(); + } + } + + public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception { + readLock(); + try { + pageTransaction.setRecordID(generateID()); + messageJournal.appendAddRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, pageTransaction); + } + finally { + readUnLock(); + } + } + + public void updatePageTransaction(final long txID, + final PageTransactionInfo pageTransaction, + final int depages) throws Exception { + readLock(); + try { + messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages)); + } + finally { + readUnLock(); + } + } + + public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception { + readLock(); + try { + messageJournal.appendUpdateRecord(pageTransaction.getRecordID(), JournalRecordIds.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages), syncNonTransactional, getContext(syncNonTransactional)); + } + finally { + readUnLock(); + } + } + + public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception { + readLock(); + try { + messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID)); + } + finally { + readUnLock(); + } + } + + public void storeAcknowledgeTransactional(final long txID, + final long queueID, + final long messageID) throws Exception { + readLock(); + try { + messageJournal.appendUpdateRecordTransactional(txID, messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID)); + } + finally { + readUnLock(); + } + } + + public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception { + readLock(); + try { + long ackID = idGenerator.generateID(); + position.setRecordID(ackID); + messageJournal.appendAddRecordTransactional(txID, ackID, JournalRecordIds.ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position)); + } + finally { + readUnLock(); + } + } + + public void storePageCompleteTransactional(long txID, long queueID, PagePosition position) throws Exception { + long recordID = idGenerator.generateID(); + position.setRecordID(recordID); + messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COMPLETE, new CursorAckRecordEncoding(queueID, position)); + } + + public void deletePageComplete(long ackID) throws Exception { + messageJournal.appendDeleteRecord(ackID, false); + } + + public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception { + readLock(); + try { + messageJournal.appendDeleteRecordTransactional(txID, ackID); + } + finally { + readUnLock(); + } + } + + public void deleteCursorAcknowledge(long ackID) throws Exception { + messageJournal.appendDeleteRecord(ackID, false); + } + + public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception { + readLock(); + try { + long id = generateID(); + + messageJournal.appendAddRecord(id, JournalRecordIds.HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getContext(true)); + return id; + } + finally { + readUnLock(); + } + } + + public void deleteHeuristicCompletion(final long id) throws Exception { + readLock(); + try { + + messageJournal.appendDeleteRecord(id, true, getContext(true)); + } + finally { + readUnLock(); + } + } + + public void deletePageTransactional(final long recordID) throws Exception { + readLock(); + try { + messageJournal.appendDeleteRecord(recordID, false); + } + finally { + readUnLock(); + } + } + + public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception { + ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); + readLock(); + try { + + messageJournal.appendUpdateRecordTransactional(txID, ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding); + } + finally { + readUnLock(); + } + } + + public void prepare(final long txID, final Xid xid) throws Exception { + readLock(); + try { + messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional)); + } + finally { + readUnLock(); + } + } + + public void commit(final long txID) throws Exception { + commit(txID, true); + } + + public void commitBindings(final long txID) throws Exception { + bindingsJournal.appendCommitRecord(txID, true); + } + + public void rollbackBindings(final long txID) throws Exception { + // no need to sync, it's going away anyways + bindingsJournal.appendRollbackRecord(txID, false); + } + + public void commit(final long txID, final boolean lineUpContext) throws Exception { + readLock(); + try { + messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext); + if (!lineUpContext && !syncTransactional) { + /** + * If {@code lineUpContext == false}, it means that we have previously lined up a + * context somewhere else (specifically see @{link TransactionImpl#asyncAppendCommit}), + * hence we need to mark it as done even if {@code syncTransactional = false} as in this + * case {@code getContext(syncTransactional=false)} would pass a dummy context to the + * {@code messageJournal.appendCommitRecord(...)} call above. + */ + getContext(true).done(); + } + } + finally { + readUnLock(); + } + } + + public void rollback(final long txID) throws Exception { + readLock(); + try { + messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional)); + } + finally { + readUnLock(); + } + } + + public void storeDuplicateIDTransactional(final long txID, + final SimpleString address, + final byte[] duplID, + final long recordID) throws Exception { + DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID); + + readLock(); + try { + messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.DUPLICATE_ID, encoding); + } + finally { + readUnLock(); + } + } + + public void updateDuplicateIDTransactional(final long txID, + final SimpleString address, + final byte[] duplID, + final long recordID) throws Exception { + DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID); + + readLock(); + try { + messageJournal.appendUpdateRecordTransactional(txID, recordID, JournalRecordIds.DUPLICATE_ID, encoding); + } + finally { + readUnLock(); + } + } + + public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception { + readLock(); + try { + messageJournal.appendDeleteRecordTransactional(txID, recordID); + } + finally { + readUnLock(); + } + } + + // Other operations + + public void updateDeliveryCount(final MessageReference ref) throws Exception { + // no need to store if it's the same value + // otherwise the journal will get OME in case of lots of redeliveries + if (ref.getDeliveryCount() == ref.getPersistedCount()) { + return; + } + + ref.setPersistedCount(ref.getDeliveryCount()); + DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount()); + + readLock(); + try { + messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional)); + } + finally { + readUnLock(); + } + } + + public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception { + deleteAddressSetting(addressSetting.getAddressMatch()); + readLock(); + try { + long id = idGenerator.generateID(); + addressSetting.setStoreId(id); + bindingsJournal.appendAddRecord(id, JournalRecordIds.ADDRESS_SETTING_RECORD, addressSetting, true); + mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting); + } + finally { + readUnLock(); + } + } + + public List<PersistedAddressSetting> recoverAddressSettings() throws Exception { + return new ArrayList<>(mapPersistedAddressSettings.values()); + } + + public List<PersistedRoles> recoverPersistedRoles() throws Exception { + return new ArrayList<>(mapPersistedRoles.values()); + } + + public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception { + + deleteSecurityRoles(persistedRoles.getAddressMatch()); + readLock(); + try { + final long id = idGenerator.generateID(); + persistedRoles.setStoreId(id); + bindingsJournal.appendAddRecord(id, JournalRecordIds.SECURITY_RECORD, persistedRoles, true); + mapPersistedRoles.put(persistedRoles.getAddressMatch(), persistedRoles); + } + finally { + readUnLock(); + } + } + + @Override + public void storeID(final long journalID, final long id) throws Exception { + readLock(); + try { + bindingsJournal.appendAddRecord(journalID, JournalRecordIds.ID_COUNTER_RECORD, BatchingIDGenerator.createIDEncodingSupport(id), true); + } + finally { + readUnLock(); + } + } + + @Override + public void deleteID(long journalD) throws Exception { + readLock(); + try { + bindingsJournal.appendDeleteRecord(journalD, false); + } + finally { + readUnLock(); + } + } + + public void deleteAddressSetting(SimpleString addressMatch) throws Exception { + PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch); + if (oldSetting != null) { + readLock(); + try { + bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false); + } + finally { + readUnLock(); + } + } + } + + public void deleteSecurityRoles(SimpleString addressMatch) throws Exception { + PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch); + if (oldRoles != null) { + readLock(); + try { + bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false); + } + finally { + readUnLock(); + } + } + } + + @Override + public JournalLoadInformation loadMessageJournal(final PostOffice postOffice, + final PagingManager pagingManager, + final ResourceManager resourceManager, + Map<Long, QueueBindingInfo> queueInfos, + final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap, + final Set<Pair<Long, Long>> pendingLargeMessages, + List<PageCountPending> pendingNonTXPageCounter, + final JournalLoader journalLoader) throws Exception { + List<RecordInfo> records = new ArrayList<>(); + + List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>(); + + Map<Long, ServerMessage> messages = new HashMap<>(); + readLock(); + try { + + JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(this, messages)); + + ArrayList<LargeServerMessage> largeMessages = new ArrayList<>(); + + Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<>(); + + Map<Long, PageSubscription> pageSubscriptions = new HashMap<>(); + + final int totalSize = records.size(); + + for (int reccount = 0; reccount < totalSize; reccount++) { + // It will show log.info only with large journals (more than 1 million records) + if (reccount > 0 && reccount % 1000000 == 0) { + long percent = (long) ((((double) reccount) / ((double) totalSize)) * 100f); + + ActiveMQServerLogger.LOGGER.percentLoaded(percent); + } + + RecordInfo record = records.get(reccount); + byte[] data = record.data; + + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); + + byte recordType = record.getUserRecordType(); + + switch (recordType) { + case JournalRecordIds.ADD_LARGE_MESSAGE_PENDING: { + PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding(); + + pending.decode(buff); + + if (pendingLargeMessages != null) { + // it could be null on tests, and we don't need anything on that case + pendingLargeMessages.add(new Pair<>(record.id, pending.largeMessageID)); + } + break; + } + case JournalRecordIds.ADD_LARGE_MESSAGE: { + LargeServerMessage largeMessage = parseLargeMessage(messages, buff); + + messages.put(record.id, largeMessage); + + largeMessages.add(largeMessage); + + break; + } + case JournalRecordIds.ADD_MESSAGE: { + ServerMessage message = new ServerMessageImpl(record.id, 50); + + message.decode(buff); + + messages.put(record.id, message); + + break; + } + case JournalRecordIds.ADD_REF: { + long messageID = record.id; + + RefEncoding encoding = new RefEncoding(); + + encoding.decode(buff); + + Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID); + + if (queueMessages == null) { + queueMessages = new LinkedHashMap<>(); + + queueMap.put(encoding.queueID, queueMessages); + } + + ServerMessage message = messages.get(messageID); + + if (message == null) { + ActiveMQServerLogger.LOGGER.cannotFindMessage(record.id); + } + else { + queueMessages.put(messageID, new AddMessageRecord(message)); + } + + break; + } + case JournalRecordIds.ACKNOWLEDGE_REF: { + long messageID = record.id; + + RefEncoding encoding = new RefEncoding(); + + encoding.decode(buff); + + Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID); + + if (queueMessages == null) { + ActiveMQServerLogger.LOGGER.journalCannotFindQueue(encoding.queueID, messageID); + } + else { + AddMessageRecord rec = queueMessages.remove(messageID); + + if (rec == null) { + ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID); + } + } + + break; + } + case JournalRecordIds.UPDATE_DELIVERY_COUNT: { + long messageID = record.id; + + DeliveryCountUpdateEncoding encoding = new DeliveryCountUpdateEncoding(); + + encoding.decode(buff); + + Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID); + + if (queueMessages == null) { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueDelCount(encoding.queueID); + } + else { + AddMessageRecord rec = queueMessages.get(messageID); + + if (rec == null) { + ActiveMQServerLogger.LOGGER.journalCannotFindMessageDelCount(messageID); + } + else { + rec.setDeliveryCount(encoding.count); + } + } + + break; + } + case JournalRecordIds.PAGE_TRANSACTION: { + if (record.isUpdate) { + PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding(); + + pageUpdate.decode(buff); + + PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX); + + pageTX.onUpdate(pageUpdate.recods, null, null); + } + else { + PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl(); + + pageTransactionInfo.decode(buff); + + pageTransactionInfo.setRecordID(record.id); + + pagingManager.addTransaction(pageTransactionInfo); + } + + break; + } + case JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME: { + long messageID = record.id; + + ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(); + + encoding.decode(buff); + + Map<Long, AddMessageRecord> queueMessages = queueMap.get(encoding.queueID); + + if (queueMessages == null) { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueScheduled(encoding.queueID, messageID); + } + else { + + AddMessageRecord rec = queueMessages.get(messageID); + + if (rec == null) { + ActiveMQServerLogger.LOGGER.cannotFindMessage(messageID); + } + else { + rec.setScheduledDeliveryTime(encoding.scheduledDeliveryTime); + } + } + + break; + } + case JournalRecordIds.DUPLICATE_ID: { + DuplicateIDEncoding encoding = new DuplicateIDEncoding(); + + encoding.decode(buff); + + List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address); + + if (ids == null) { + ids = new ArrayList<>(); + + duplicateIDMap.put(encoding.address, ids); + } + + ids.add(new Pair<>(encoding.duplID, record.id)); + + break; + } + case JournalRecordIds.HEURISTIC_COMPLETION: { + HeuristicCompletionEncoding encoding = new HeuristicCompletionEncoding(); + encoding.decode(buff); + resourceManager.putHeuristicCompletion(record.id, encoding.xid, encoding.isCommit); + break; + } + case JournalRecordIds.ACKNOWLEDGE_CURSOR: { + CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); + encoding.decode(buff); + + encoding.position.setRecordID(record.id); + + PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager); + + if (sub != null) { + sub.reloadACK(encoding.position); + } + else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloading(encoding.queueID); + messageJournal.appendDeleteRecord(record.id, false); + + } + + break; + } + case JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE: { + PageCountRecord encoding = new PageCountRecord(); + + encoding.decode(buff); + + PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); + + if (sub != null) { + sub.getCounter().loadValue(record.id, encoding.getValue()); + } + else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID()); + messageJournal.appendDeleteRecord(record.id, false); + } + + break; + } + + case JournalRecordIds.PAGE_CURSOR_COUNTER_INC: { + PageCountRecordInc encoding = new PageCountRecordInc(); + + encoding.decode(buff); + + PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); + + if (sub != null) { + sub.getCounter().loadInc(record.id, encoding.getValue()); + } + else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID()); + messageJournal.appendDeleteRecord(record.id, false); + } + + break; + } + + case JournalRecordIds.PAGE_CURSOR_COMPLETE: { + CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); + encoding.decode(buff); + + encoding.position.setRecordID(record.id); + + PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager); + + if (sub != null) { + sub.reloadPageCompletion(encoding.position); + } + else { + ActiveMQServerLogger.LOGGER.cantFindQueueOnPageComplete(encoding.queueID); + messageJournal.appendDeleteRecord(record.id, false); + } + + break; + } + + case JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER: { + + PageCountPendingImpl pendingCountEncoding = new PageCountPendingImpl(); + pendingCountEncoding.decode(buff); + pendingCountEncoding.setID(record.id); + + // This can be null on testcases not interested on this outcome + if (pendingNonTXPageCounter != null) { + pendingNonTXPageCounter.add(pendingCountEncoding); + } + break; + } + + default: { + throw new IllegalStateException("Invalid record type " + recordType); + } + } + + // This will free up memory sooner. The record is not needed any more + // and its byte array would consume memory during the load process even though it's not necessary any longer + // what would delay processing time during load + records.set(reccount, null); + } + + // Release the memory as soon as not needed any longer + records.clear(); + records = null; + + journalLoader.handleAddMessage(queueMap); + + loadPreparedTransactions(postOffice, pagingManager, resourceManager, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions, pendingLargeMessages, journalLoader); + + for (PageSubscription sub : pageSubscriptions.values()) { + sub.getCounter().processReload(); + } + + for (LargeServerMessage msg : largeMessages) { + if (msg.getRefCount() == 0) { + ActiveMQServerLogger.LOGGER.largeMessageWithNoRef(msg.getMessageID()); + msg.decrementDelayDeletionCount(); + } + } + + journalLoader.handleNoMessageReferences(messages); + + // To recover positions on Iterators + if (pagingManager != null) { + // it could be null on certain tests that are not dealing with paging + // This could also be the case in certain embedded conditions + pagingManager.processReload(); + } + + if (perfBlastPages != -1) { + messageJournal.perfBlast(perfBlastPages); + } + + journalLoader.postLoad(messageJournal, resourceManager, duplicateIDMap); + journalLoaded = true; + return info; + } + finally { + readUnLock(); + } + } + + /** + * @param queueID + * @param pageSubscriptions + * @param queueInfos + * @return + */ + private static PageSubscription locateSubscription(final long queueID, + final Map<Long, PageSubscription> pageSubscriptions, + final Map<Long, QueueBindingInfo> queueInfos, + final PagingManager pagingManager) throws Exception { + + PageSubscription subs = pageSubscriptions.get(queueID); + if (subs == null) { + QueueBindingInfo queueInfo = queueInfos.get(queueID); + + if (queueInfo != null) { + SimpleString address = queueInfo.getAddress(); + PagingStore store = pagingManager.getPageStore(address); + subs = store.getCursorProvider().getSubscription(queueID); + pageSubscriptions.put(queueID, subs); + } + } + + return subs; + } + + // grouping handler operations + public void addGrouping(final GroupBinding groupBinding) throws Exception { + GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(), groupBinding.getGroupId(), groupBinding.getClusterName()); + readLock(); + try { + bindingsJournal.appendAddRecord(groupBinding.getId(), JournalRecordIds.GROUP_RECORD, groupingEncoding, true); + } + finally { + readUnLock(); + } + } + + public void deleteGrouping(long tx, final GroupBinding groupBinding) throws Exception { + readLock(); + try { + bindingsJournal.appendDeleteRecordTransactional(tx, groupBinding.getId()); + } + finally { + readUnLock(); + } + } + + // BindingsImpl operations + + public void addQueueBinding(final long tx, final Binding binding) throws Exception { + Queue queue = (Queue) binding.getBindable(); + + Filter filter = queue.getFilter(); + + SimpleString filterString = filter == null ? null : filter.getFilterString(); + + PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated()); + + readLock(); + try { + bindingsJournal.appendAddRecordTransactional(tx, binding.getID(), JournalRecordIds.QUEUE_BINDING_RECORD, bindingEncoding); + } + finally { + readUnLock(); + } + } + + public void deleteQueueBinding(long tx, final long queueBindingID) throws Exception { + readLock(); + try { + bindingsJournal.appendDeleteRecordTransactional(tx, queueBindingID); + } + finally { + readUnLock(); + } + } + + public long storePageCounterInc(long txID, long queueID, int value) throws Exception { + readLock(); + try { + long recordID = idGenerator.generateID(); + messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value)); + return recordID; + } + finally { + readUnLock(); + } + } + + public long storePageCounterInc(long queueID, int value) throws Exception { + readLock(); + try { + final long recordID = idGenerator.generateID(); + messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value), true, getContext()); + return recordID; + } + finally { + readUnLock(); + } + } + + @Override + public long storePageCounter(long txID, long queueID, long value) throws Exception { + readLock(); + try { + final long recordID = idGenerator.generateID(); + messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value)); + return recordID; + } + finally { + readUnLock(); + } + } + + @Override + public long storePendingCounter(final long queueID, final long pageID, final int inc) throws Exception { + readLock(); + try { + final long recordID = idGenerator.generateID(); + PageCountPendingImpl pendingInc = new PageCountPendingImpl(queueID, pageID, inc); + // We must guarantee the record sync before we actually write on the page otherwise we may get out of sync + // on the counter + messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER, pendingInc, true); + return recordID; + } + finally { + readUnLock(); + } + } + + public void deleteIncrementRecord(long txID, long recordID) throws Exception { + readLock(); + try { + messageJournal.appendDeleteRecordTransactional(txID, recordID); + } + finally { + readUnLock(); + } + } + + public void deletePageCounter(long txID, long recordID) throws Exception { + readLock(); + try { + messageJournal.appendDeleteRecordTransactional(txID, recordID); + } + finally { + readUnLock(); + } + } + + public void deletePendingPageCounter(long txID, long recordID) throws Exception { + readLock(); + try { + messageJournal.appendDeleteRecordTransactional(txID, recordID); + } + finally { + readUnLock(); + } + } + + public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, + final List<GroupingInfo> groupingInfos) throws Exception { + List<RecordInfo> records = new ArrayList<RecordInfo>(); + + List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>(); + + JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null); + + for (RecordInfo record : records) { + long id = record.id; + + ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(record.data); + + byte rec = record.getUserRecordType(); + + if (rec == JournalRecordIds.QUEUE_BINDING_RECORD) { + PersistentQueueBindingEncoding bindingEncoding = newBindingEncoding(id, buffer); + + queueBindingInfos.add(bindingEncoding); + } + else if (rec == JournalRecordIds.ID_COUNTER_RECORD) { + idGenerator.loadState(record.id, buffer); + } + else if (rec == JournalRecordIds.GROUP_RECORD) { + GroupingEncoding encoding = newGroupEncoding(id, buffer); + groupingInfos.add(encoding); + } + else if (rec == JournalRecordIds.ADDRESS_SETTING_RECORD) { + PersistedAddressSetting setting = newAddressEncoding(id, buffer); + mapPersistedAddressSettings.put(setting.getAddressMatch(), setting); + } + else if (rec == JournalRecordIds.SECURITY_RECORD) { + PersistedRoles roles = newSecurityRecord(id, buffer); + mapPersistedRoles.put(roles.getAddressMatch(), roles); + } + else { + throw new IllegalStateException("Invalid record type " + rec); + } + } + + // This will instruct the IDGenerator to beforeStop old records + idGenerator.cleanup(); + + return bindingsInfo; + } + + public void lineUpContext() { + readLock(); + try { + messageJournal.lineUpContext(getContext()); + } + finally { + readUnLock(); + } + } + + // ActiveMQComponent implementation + // ------------------------------------------------------ + + protected abstract void beforeStart() throws Exception; + + public synchronized void start() throws Exception { + if (started) { + return; + } + + beforeStart(); + + singleThreadExecutor = Executors.newSingleThreadExecutor(AccessController.doPrivileged(new PrivilegedAction<ActiveMQThreadFactory>() { + @Override + public ActiveMQThreadFactory run() { + return new ActiveMQThreadFactory("ActiveMQ-IO-SingleThread", true, JournalStorageManager.class.getClassLoader()); + } + })); + + bindingsJournal.start(); + + messageJournal.start(); + + started = true; + } + + public void stop() throws Exception { + stop(false); + } + + @Override + public synchronized void persistIdGenerator() { + if (journalLoaded && idGenerator != null) { + // Must call close to make sure last id is persisted + idGenerator.persistCurrentID(); + } + } + + /** + * Assumption is that this is only called with a writeLock on the StorageManager. + */ + protected abstract void performCachedLargeMessageDeletes(); + + public synchronized void stop(boolean ioCriticalError) throws Exception { + if (!started) { + return; + } + + if (!ioCriticalError) { + performCachedLargeMessageDeletes(); + // Must call close to make sure last id is persisted + if (journalLoaded && idGenerator != null) + idGenerator.persistCurrentID(); + } + + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + + latch.await(30, TimeUnit.SECONDS); + + beforeStop(); + + bindingsJournal.stop(); + + messageJournal.stop(); + + singleThreadExecutor.shutdown(); + + journalLoaded = false; + + started = false; + } + + protected abstract void beforeStop() throws Exception; + + public synchronized boolean isStarted() { + return started; + } + + /** + * TODO: Is this still being used ? + */ + public JournalLoadInformation[] loadInternalOnly() throws Exception { + readLock(); + try { + JournalLoadInformation[] info = new JournalLoadInformation[2]; + info[0] = bindingsJournal.loadInternalOnly(); + info[1] = messageJournal.loadInternalOnly(); + + return info; + } + finally { + readUnLock(); + } + } + + public void beforePageRead() throws Exception { + if (pageMaxConcurrentIO != null) { + pageMaxConcurrentIO.acquire(); + } + } + + public void afterPageRead() throws Exception { + if (pageMaxConcurrentIO != null) { + pageMaxConcurrentIO.release(); + } + } + + // Public ----------------------------------------------------------------------------------- + + public Journal getMessageJournal() { + return messageJournal; + } + + public Journal getBindingsJournal() { + return bindingsJournal; + } + + // Package protected --------------------------------------------- + + protected void confirmLargeMessage(final LargeServerMessage largeServerMessage) { + if (largeServerMessage.getPendingRecordID() >= 0) { + try { + confirmPendingLargeMessage(largeServerMessage.getPendingRecordID()); + largeServerMessage.setPendingRecordID(-1); + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + } + + protected abstract LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages, + ActiveMQBuffer buff) throws Exception; + + private void loadPreparedTransactions(final PostOffice postOffice, + final PagingManager pagingManager, + final ResourceManager resourceManager, + final Map<Long, QueueBindingInfo> queueInfos, + final List<PreparedTransactionInfo> preparedTransactions, + final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap, + final Map<Long, PageSubscription> pageSubscriptions, + final Set<Pair<Long, Long>> pendingLargeMessages, + JournalLoader journalLoader) throws Exception { + // recover prepared transactions + for (PreparedTransactionInfo preparedTransaction : preparedTransactions) { + XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData()); + + Xid xid = encodingXid.xid; + + Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this); + + List<MessageReference> referencesToAck = new ArrayList<MessageReference>(); + + Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>(); + + // Use same method as load message journal to prune out acks, so they don't get added. + // Then have reacknowledge(tx) methods on queue, which needs to add the page size + + // first get any sent messages for this tx and recreate + for (RecordInfo record : preparedTransaction.getRecords()) { + byte[] data = record.data; + + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); + + byte recordType = record.getUserRecordType(); + + switch (recordType) { + case JournalRecordIds.ADD_LARGE_MESSAGE: { + messages.put(record.id, parseLargeMessage(messages, buff)); + + break; + } + case JournalRecordIds.ADD_MESSAGE: { + ServerMessage message = new ServerMessageImpl(record.id, 50); + + message.decode(buff); + + messages.put(record.id, message); + + break; + } + case JournalRecordIds.ADD_REF: { + long messageID = record.id; + + RefEncoding encoding = new RefEncoding(); + + encoding.decode(buff); + + ServerMessage message = messages.get(messageID); + + if (message == null) { + throw new IllegalStateException("Cannot find message with id " + messageID); + } + + journalLoader.handlePreparedSendMessage(message, tx, encoding.queueID); + + break; + } + case JournalRecordIds.ACKNOWLEDGE_REF: { + long messageID = record.id; + + RefEncoding encoding = new RefEncoding(); + + encoding.decode(buff); + + journalLoader.handlePreparedAcknowledge(messageID, referencesToAck, encoding.queueID); + + break; + } + case JournalRecordIds.PAGE_TRANSACTION: { + + PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl(); + + pageTransactionInfo.decode(buff); + + if (record.isUpdate) { + PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID()); + pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages()); + } + else { + pageTransactionInfo.setCommitted(false); + + tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo); + + pagingManager.addTransaction(pageTransactionInfo); + + tx.addOperation(new FinishPageMessageOperation()); + } + + break; + } + case SET_SCHEDULED_DELIVERY_TIME: { + // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which + // case the message will already have the header for the scheduled delivery time, so no need to do + // anything. + + break; + } + case DUPLICATE_ID: { + // We need load the duplicate ids at prepare time too + DuplicateIDEncoding encoding = new DuplicateIDEncoding(); + + encoding.decode(buff); + + DuplicateIDCache cache = postOffice.getDuplicateIDCache(encoding.address); + + cache.load(tx, encoding.duplID); + + break; + } + case ACKNOWLEDGE_CURSOR: { + CursorAckRecordEncoding encoding = new CursorAckRecordEncoding(); + encoding.decode(buff); + + encoding.position.setRecordID(record.id); + + PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager); + + if (sub != null) { + sub.reloadPreparedACK(tx, encoding.position); + referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub)); + } + else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID); + } + break; + } + case PAGE_CURSOR_COUNTER_VALUE: { + ActiveMQServerLogger.LOGGER.journalPAGEOnPrepared(); + + break; + } + + case PAGE_CURSOR_COUNTER_INC: { + PageCountRecordInc encoding = new PageCountRecordInc(); + + encoding.decode(buff); + + PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); + + if (sub != null) { + sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue()); + sub.notEmpty(); + } + else { + ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID()); + } + + break; + } + + default: { + ActiveMQServerLogger.LOGGER.journalInvalidRecordType(recordType); + } + } + } + + for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) { + byte[] data = recordDeleted.data; + + if (data.length > 0) { + ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data); + byte b = buff.readByte(); + + switch (b) { + case ADD_LARGE_MESSAGE_PENDING: { + long messageID = buff.readLong(); + if (!pendingLargeMessages.remove(new Pair<Long, Long>(recordDeleted.id, messageID))) { + ActiveMQServerLogger.LOGGER.largeMessageNotFound(recordDeleted.id); + } + installLargeMessageConfirmationOnTX(tx, recordDeleted.id); + break; + } + default: + ActiveMQServerLogger.LOGGER.journalInvalidRecordTypeOnPreparedTX(b); + } + } + + } + + journalLoader.handlePreparedTransaction(tx, referencesToAck, xid, resourceManager); + } + } + + OperationContext getContext(final boolean sync) { + if (sync) { + return getContext(); + } + else { + return DummyOperationContext.getInstance(); + } + } + + // Inner Classes + // ---------------------------------------------------------------------------- + + private static final class DummyOperationContext implements OperationContext { + + private static DummyOperationContext instance = new DummyOperationContext(); + + public static OperationContext getInstance() { + return DummyOperationContext.instance; + } + + public void executeOnCompletion(final IOCallback runnable) { + // There are no executeOnCompletion calls while using the DummyOperationContext + // However we keep the code here for correctness + runnable.done(); + } + + public void replicationDone() { + } + + public void replicationLineUp() { + } + + public void storeLineUp() { + } + + public void done() { + } + + public void onError(final int errorCode, final String errorMessage) { + } + + public void waitCompletion() { + } + + public boolean waitCompletion(final long timeout) { + return true; + } + + public void pageSyncLineUp() { + } + + public void pageSyncDone() { + } + } + + /** + * @param id + * @param buffer + * @return + */ + protected static PersistedRoles newSecurityRecord(long id, ActiveMQBuffer buffer) { + PersistedRoles roles = new PersistedRoles(); + roles.decode(buffer); + roles.setStoreId(id); + return roles; + } + + /** + * @param id + * @param buffer + * @return + */ + static PersistedAddressSetting newAddressEncoding(long id, ActiveMQBuffer buffer) { + PersistedAddressSetting setting = new PersistedAddressSetting(); + setting.decode(buffer); + setting.setStoreId(id); + return setting; + } + + /** + * @param id + * @param buffer + * @return + */ + static GroupingEncoding newGroupEncoding(long id, ActiveMQBuffer buffer) { + GroupingEncoding encoding = new GroupingEncoding(); + encoding.decode(buffer); + encoding.setId(id); + return encoding; + } + + /** + * @param id + * @param buffer + * @return + */ + protected static PersistentQueueBindingEncoding newBindingEncoding(long id, ActiveMQBuffer buffer) { + PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(); + + bindingEncoding.decode(buffer); + + bindingEncoding.setId(id); + return bindingEncoding; + } + + @Override + public boolean addToPage(PagingStore store, + ServerMessage msg, + Transaction tx, + RouteContextList listCtx) throws Exception { + /** + * Exposing the read-lock here is an encapsulation violation done in order to keep the code + * simpler. The alternative would be to add a second method, say 'verifyPaging', to + * PagingStore. + * <p> + * Adding this second method would also be more surprise prone as it would require a certain + * calling order. + * <p> + * The reasoning is that exposing the lock is more explicit and therefore `less bad`. + */ + return store.page(msg, tx, listCtx, storageManagerLock.readLock()); + } + + private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID) { + TXLargeMessageConfirmationOperation txoper = (TXLargeMessageConfirmationOperation) tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS); + if (txoper == null) { + txoper = new TXLargeMessageConfirmationOperation(this); + tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper); + } + txoper.confirmedMessages.add(recordID); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64f74acd/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java index fdae483..3ca38e3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java @@ -26,11 +26,9 @@ public final class AddMessageRecord { final ServerMessage message; - // mtaylor (Added to compile) - public long scheduledDeliveryTime; + private long scheduledDeliveryTime; - // mtaylor (Added to compile) - public int deliveryCount; + private int deliveryCount; public ServerMessage getMessage() { return message; @@ -44,4 +42,11 @@ public final class AddMessageRecord { return deliveryCount; } + public void setScheduledDeliveryTime(long scheduledDeliveryTime) { + this.scheduledDeliveryTime = scheduledDeliveryTime; + } + + public void setDeliveryCount(int deliveryCount) { + this.deliveryCount = deliveryCount; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64f74acd/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java new file mode 100644 index 0000000..4616e78 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.persistence.impl.journal; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; +import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl; +import org.apache.activemq.artemis.utils.ExecutorFactory; + +public class JDBCJournalStorageManager extends JournalStorageManager { + + public JDBCJournalStorageManager(Configuration config, ExecutorFactory executorFactory) { + super(config, executorFactory); + } + + public JDBCJournalStorageManager(final Configuration config, + final ExecutorFactory executorFactory, + final IOCriticalErrorListener criticalErrorListener) { + super(config, executorFactory, criticalErrorListener); + } + + @Override + protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) { + DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) config.getStoreConfiguration(); + + Journal localBindings = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getBindingsTableName()); + bindingsJournal = localBindings; + + Journal localMessage = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getMessageTableName()); + messageJournal = localMessage; + } + + @Override + public synchronized void stop(boolean ioCriticalError) throws Exception { + if (!started) { + return; + } + + if (!ioCriticalError) { + performCachedLargeMessageDeletes(); + // Must call close to make sure last id is persisted + if (journalLoaded && idGenerator != null) + idGenerator.persistCurrentID(); + } + + final CountDownLatch latch = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + + latch.await(30, TimeUnit.SECONDS); + + beforeStop(); + + ((JDBCJournalImpl) bindingsJournal).stop(false); + + messageJournal.stop(); + + singleThreadExecutor.shutdown(); + + journalLoaded = false; + + started = false; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64f74acd/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java index 0242b50..4aa470b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java @@ -27,7 +27,6 @@ public final class JournalRecordIds { // grouping journal record type - // mtaylor Added to compile public static final byte GROUP_RECORD = 20; // BindingsImpl journal record type