http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index eb7cda1..2108be7 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -31,11 +31,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; @@ -366,10 +368,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setSync(sync); appendRecord(r); } @@ -377,12 +379,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override public void appendAddRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion completionCallback) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setSync(sync); r.setIoCompletion(completionCallback); appendRecord(r); @@ -398,10 +401,10 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + public void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setSync(sync); appendRecord(r); } @@ -409,12 +412,13 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override public void appendUpdateRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion completionCallback) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setSync(sync); r.setIoCompletion(completionCallback); appendRecord(r); @@ -448,10 +452,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void appendAddRecordTransactional(long txID, long id, byte recordType, - EncodingSupport record) throws Exception { + Persister persister, + Object record) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setTxId(txID); appendRecord(r); } @@ -469,10 +474,11 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { public void appendUpdateRecordTransactional(long txID, long id, byte recordType, - EncodingSupport record) throws Exception { + Persister persister, + Object record) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX, seq.incrementAndGet()); r.setUserRecordType(recordType); - r.setRecord(record); + r.setRecord(persister, record); r.setTxId(txID); appendRecord(r); } @@ -488,7 +494,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { @Override public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception { JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX, seq.incrementAndGet()); - r.setRecord(record); + r.setRecord(EncoderPersister.getInstance(), record); r.setTxId(txID); appendRecord(r); } @@ -685,10 +691,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public void perfBlast(int pages) { - } - - @Override public void runDirectJournalBlast() throws Exception { }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java ---------------------------------------------------------------------- diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index 9691d3e..b094164 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; @@ -237,11 +238,11 @@ class JDBCJournalRecord { this.record = record; } - public void setRecord(EncodingSupport record) { - this.variableSize = record.getEncodeSize(); + public void setRecord(Persister persister, Object record) { + this.variableSize = persister.getEncodeSize(record); ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(variableSize); - record.encode(encodedBuffer); + persister.encode(encodedBuffer, record); this.record = new ActiveMQBufferInputStream(encodedBuffer); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java index 59f04e8..6da3912 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQBytesMessage.java @@ -26,7 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean; @@ -374,7 +374,7 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag if (bodyLength == 0) return null; byte[] dst = new byte[bodyLength]; - message.getBodyBuffer().getBytes(MessageImpl.BODY_OFFSET, dst); + message.getBodyBuffer().getBytes(CoreMessage.BODY_OFFSET, dst); return (T) dst; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java index 47dcfb2..80a07ac 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java @@ -43,7 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.UUID; @@ -293,7 +293,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public String getJMSMessageID() { if (msgID == null) { - UUID uid = message.getUserID(); + UUID uid = (UUID)message.getUserID(); msgID = uid == null ? null : "ID:" + uid.toString(); } @@ -397,7 +397,7 @@ public class ActiveMQMessage implements javax.jms.Message { @Override public Destination getJMSDestination() throws JMSException { if (dest == null) { - SimpleString address = message.getAddress(); + SimpleString address = message.getAddressSimpleString(); String prefix = ""; if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) { RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)); @@ -756,7 +756,7 @@ public class ActiveMQMessage implements javax.jms.Message { @SuppressWarnings("unchecked") protected <T> T getBodyInternal(Class<T> c) throws MessageFormatException { - InputStream is = ((MessageInternal) message).getBodyInputStream(); + InputStream is = ((ClientMessageInternal) message).getBodyInputStream(); try { ObjectInputStream ois = new ObjectInputStream(is); return (T) ois.readObject(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java index 6cf20ff..289f88c 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.jms.transaction; import javax.transaction.xa.Xid; import java.util.Map; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionDetail; import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage; @@ -36,7 +36,7 @@ public class JMSTransactionDetail extends TransactionDetail { } @Override - public String decodeMessageType(ServerMessage msg) { + public String decodeMessageType(Message msg) { int type = msg.getType(); switch (type) { case ActiveMQMessage.TYPE: // 0 @@ -57,7 +57,7 @@ public class JMSTransactionDetail extends TransactionDetail { } @Override - public Map<String, Object> decodeMessageProperties(ServerMessage msg) { + public Map<String, Object> decodeMessageProperties(Message msg) { try { return ActiveMQMessage.coreMaptoJMSMap(msg.toMap()); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java new file mode 100644 index 0000000..8fc2a5aa --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/EncoderPersister.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.journal; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.core.persistence.Persister; + +/** This is a facade between the new Persister and the former EncodingSupport. + * Methods using the old interface will use this as a facade to provide the previous semantic. */ +public class EncoderPersister implements Persister<EncodingSupport> { + + private static final EncoderPersister theInstance = new EncoderPersister(); + + private EncoderPersister() { + } + + public static EncoderPersister getInstance() { + return theInstance; + } + + @Override + public int getEncodeSize(EncodingSupport record) { + return record.getEncodeSize(); + } + + @Override + public void encode(ActiveMQBuffer buffer, EncodingSupport record) { + record.encode(buffer); + } + + @Override + public EncodingSupport decode(ActiveMQBuffer buffer, EncodingSupport record) { + record.decode(buffer); + return record; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index fbd4182..ca194b8 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.journal.impl.JournalFile; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQComponent; /** @@ -60,23 +61,49 @@ public interface Journal extends ActiveMQComponent { void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; - void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception; + default void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync); + } + + void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; void appendAddRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion completionCallback) throws Exception; + default void appendAddRecord(long id, + byte recordType, + EncodingSupport record, + boolean sync, + IOCompletion completionCallback) throws Exception { + appendAddRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); + } + void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; - void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception; + default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); + } - void appendUpdateRecord(long id, + void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; + + default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, - IOCompletion completionCallback) throws Exception; + IOCompletion completionCallback) throws Exception { + appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, completionCallback); + } + + void appendUpdateRecord(final long id, + final byte recordType, + final Persister persister, + final Object record, + final boolean sync, + final IOCompletion callback) throws Exception; void appendDeleteRecord(long id, boolean sync) throws Exception; @@ -86,11 +113,23 @@ public interface Journal extends ActiveMQComponent { void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception; - void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception; + default void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception { + appendAddRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record); + } + + void appendAddRecordTransactional(final long txID, + final long id, + final byte recordType, + final Persister persister, + final Object record) throws Exception; void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception; - void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception; + default void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception { + appendUpdateRecordTransactional(txID, id, recordType, EncoderPersister.getInstance(), record); + } + + void appendUpdateRecordTransactional(long txID, long id, byte recordType, Persister persister, Object record) throws Exception; void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception; @@ -165,8 +204,6 @@ public interface Journal extends ActiveMQComponent { int getUserVersion(); - void perfBlast(int pages); - void runDirectJournalBlast() throws Exception; /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java index 8bbecd2..943077c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord; @@ -127,7 +128,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback } } - JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, new ByteArrayEncoding(filesToRename.toByteBuffer().array())); + JournalInternalRecord controlRecord = new JournalAddRecord(true, 1, (byte) 0, EncoderPersister.getInstance(), new ByteArrayEncoding(filesToRename.toByteBuffer().array())); ActiveMQBuffer renameBuffer = ActiveMQBuffers.dynamicBuffer(filesToRename.writerIndex()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 0b702a5..8e5ca2c 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; @@ -90,10 +91,11 @@ public final class FileWrapperJournal extends JournalBase { @Override public void appendAddRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion callback) throws Exception { - JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); + JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record); writeRecord(addRecord, sync, callback); } @@ -144,19 +146,21 @@ public final class FileWrapperJournal extends JournalBase { public void appendAddRecordTransactional(long txID, long id, byte recordType, - EncodingSupport record) throws Exception { + Persister persister, + Object record) throws Exception { count(txID); - JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); writeRecord(addRecord, false, null); } @Override public void appendUpdateRecord(long id, byte recordType, - EncodingSupport record, + Persister persister, + Object record, boolean sync, IOCompletion callback) throws Exception { - JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); + JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); writeRecord(updateRecord, sync, callback); } @@ -164,9 +168,10 @@ public final class FileWrapperJournal extends JournalBase { public void appendUpdateRecordTransactional(long txID, long id, byte recordType, - EncodingSupport record) throws Exception { + Persister persister, + Object record) throws Exception { count(txID); - JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, persister, record); writeRecord(updateRecordTX, false, null); } @@ -261,11 +266,6 @@ public final class FileWrapperJournal extends JournalBase { } @Override - public void perfBlast(int pages) { - throw new UnsupportedOperationException(); - } - - @Override public void runDirectJournalBlast() throws Exception { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java index e2ca84d..e6bd99e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java @@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; abstract class JournalBase implements Journal { @@ -37,68 +38,15 @@ abstract class JournalBase implements Journal { } @Override - public abstract void appendAddRecord(final long id, - final byte recordType, - final EncodingSupport record, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override - public abstract void appendAddRecordTransactional(final long txID, - final long id, - final byte recordType, - final EncodingSupport record) throws Exception; - - @Override - public abstract void appendCommitRecord(final long txID, - final boolean sync, - final IOCompletion callback, - boolean lineUpContext) throws Exception; - - @Override - public abstract void appendDeleteRecord(final long id, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override - public abstract void appendDeleteRecordTransactional(final long txID, - final long id, - final EncodingSupport record) throws Exception; - - @Override - public abstract void appendPrepareRecord(final long txID, - final EncodingSupport transactionData, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override - public abstract void appendUpdateRecord(final long id, - final byte recordType, - final EncodingSupport record, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override - public abstract void appendUpdateRecordTransactional(final long txID, - final long id, - final byte recordType, - final EncodingSupport record) throws Exception; - - @Override - public abstract void appendRollbackRecord(final long txID, - final boolean sync, - final IOCompletion callback) throws Exception; - - @Override public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception { appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync); } @Override - public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { + public void appendAddRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); - appendAddRecord(id, recordType, record, sync, callback); + appendAddRecord(id, recordType, persister, record, sync, callback); if (callback != null) { callback.waitCompletion(); @@ -176,11 +124,12 @@ abstract class JournalBase implements Journal { @Override public void appendUpdateRecord(final long id, final byte recordType, - final EncodingSupport record, + final Persister persister, + final Object record, final boolean sync) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); - appendUpdateRecord(id, recordType, record, sync, callback); + appendUpdateRecord(id, recordType, persister, record, sync, callback); if (callback != null) { callback.waitCompletion(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index b95d641..c62b27b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.journal.EncoderPersister; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; @@ -252,7 +253,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ @Override public void onReadAddRecord(final RecordInfo info) throws Exception { if (lookupRecord(info.id)) { - JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)); + JournalInternalRecord addRecord = new JournalAddRecord(true, info.id, info.getUserRecordType(), EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); addRecord.setCompactCount((short) (info.compactCount + 1)); checkSize(addRecord.getEncodeSize(), info.compactCount); @@ -268,7 +269,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) { JournalTransaction newTransaction = getNewJournalTransaction(transactionID); - JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), new ByteArrayEncoding(info.data)); + JournalInternalRecord record = new JournalAddRecordTX(true, transactionID, info.id, info.getUserRecordType(), EncoderPersister.getInstance(),new ByteArrayEncoding(info.data)); record.setCompactCount((short) (info.compactCount + 1)); @@ -374,7 +375,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ @Override public void onReadUpdateRecord(final RecordInfo info) throws Exception { if (lookupRecord(info.id)) { - JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, new ByteArrayEncoding(info.data)); + JournalInternalRecord updateRecord = new JournalAddRecord(false, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); updateRecord.setCompactCount((short) (info.compactCount + 1)); @@ -397,7 +398,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id)) { JournalTransaction newTransaction = getNewJournalTransaction(transactionID); - JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, new ByteArrayEncoding(info.data)); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, transactionID, info.id, info.userRecordType, EncoderPersister.getInstance(), new ByteArrayEncoding(info.data)); updateRecordTX.setCompactCount((short) (info.compactCount + 1)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index db615f8..24bb916 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -57,11 +57,11 @@ import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TestableJournal; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; -import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX; import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalCompleteRecordTX; @@ -713,7 +713,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal @Override public void appendAddRecord(final long id, final byte recordType, - final EncodingSupport record, + final Persister persister, + final Object record, final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); @@ -727,7 +728,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void run() { journalLock.readLock().lock(); try { - JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record); + JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, persister, record); JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback); records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize())); @@ -762,7 +763,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal @Override public void appendUpdateRecord(final long id, final byte recordType, - final EncodingSupport record, + final Persister persister, + final Object record, final boolean sync, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); @@ -777,7 +779,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal journalLock.readLock().lock(); try { JournalRecord jrnRecord = records.get(id); - JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record); + JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback); if (logger.isTraceEnabled()) { @@ -873,7 +875,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, - final EncodingSupport record) throws Exception { + final Persister persister, + final Object record) throws Exception { checkJournalIsLoaded(); final JournalTransaction tx = getTransactionInfo(txID); @@ -885,7 +888,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void run() { journalLock.readLock().lock(); try { - JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record); + JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); if (logger.isTraceEnabled()) { @@ -952,7 +955,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, - final EncodingSupport record) throws Exception { + final Persister persister, + final Object record) throws Exception { checkJournalIsLoaded(); final JournalTransaction tx = getTransactionInfo(txID); @@ -965,7 +969,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal journalLock.readLock().lock(); try { - JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record ); + JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, persister, record ); JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null ); if ( logger.isTraceEnabled() ) { @@ -2165,45 +2169,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } } - @Override - public void perfBlast(final int pages) { - - checkJournalIsLoaded(); - - final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]); - - final JournalInternalRecord blastRecord = new JournalInternalRecord() { - - @Override - public int getEncodeSize() { - return byteEncoder.getEncodeSize(); - } - - @Override - public void encode(final ActiveMQBuffer buffer) { - byteEncoder.encode(buffer); - } - }; - - appendExecutor.execute(new Runnable() { - @Override - public void run() { - journalLock.readLock().lock(); - try { - - for (int i = 0; i < pages; i++) { - appendRecord(blastRecord, false, false, null, null); - } - - } catch (Exception e) { - ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e); - } finally { - journalLock.readLock().unlock(); - } - } - }); - } - // ActiveMQComponent implementation // --------------------------------------------------- @@ -2921,5 +2886,4 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public int getCompactCount() { return compactCount; } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java index c6a5d4a..6e5b651 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java @@ -17,14 +17,16 @@ package org.apache.activemq.artemis.core.journal.impl.dataformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; public class JournalAddRecord extends JournalInternalRecord { protected final long id; - protected final EncodingSupport record; + protected final Persister persister; + + protected final Object record; protected final byte recordType; @@ -35,7 +37,7 @@ public class JournalAddRecord extends JournalInternalRecord { * @param recordType * @param record */ - public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record) { + public JournalAddRecord(final boolean add, final long id, final byte recordType, final Persister persister, Object record) { this.id = id; this.record = record; @@ -43,6 +45,8 @@ public class JournalAddRecord extends JournalInternalRecord { this.recordType = recordType; this.add = add; + + this.persister = persister; } @Override @@ -59,17 +63,19 @@ public class JournalAddRecord extends JournalInternalRecord { buffer.writeLong(id); - buffer.writeInt(record.getEncodeSize()); + int recordEncodeSize = persister.getEncodeSize(record); + + buffer.writeInt(persister.getEncodeSize(record)); buffer.writeByte(recordType); - record.encode(buffer); + persister.encode(buffer, record); - buffer.writeInt(getEncodeSize()); + buffer.writeInt(recordEncodeSize + JournalImpl.SIZE_ADD_RECORD + 1); } @Override public int getEncodeSize() { - return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize() + 1; + return JournalImpl.SIZE_ADD_RECORD + persister.getEncodeSize(record) + 1; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java index 6cec122..483418f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecordTX.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.core.journal.impl.dataformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.JournalImpl; public class JournalAddRecordTX extends JournalInternalRecord { @@ -26,7 +26,9 @@ public class JournalAddRecordTX extends JournalInternalRecord { private final long id; - private final EncodingSupport record; + protected final Persister persister; + + protected final Object record; private final byte recordType; @@ -41,12 +43,15 @@ public class JournalAddRecordTX extends JournalInternalRecord { final long txID, final long id, final byte recordType, - final EncodingSupport record) { + final Persister persister, + Object record) { this.txID = txID; this.id = id; + this.persister = persister; + this.record = record; this.recordType = recordType; @@ -70,17 +75,17 @@ public class JournalAddRecordTX extends JournalInternalRecord { buffer.writeLong(id); - buffer.writeInt(record.getEncodeSize()); + buffer.writeInt(persister.getEncodeSize(record)); buffer.writeByte(recordType); - record.encode(buffer); + persister.encode(buffer, record); buffer.writeInt(getEncodeSize()); } @Override public int getEncodeSize() { - return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize() + 1; + return JournalImpl.SIZE_ADD_RECORD_TX + persister.getEncodeSize(record) + 1; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java new file mode 100644 index 0000000..ee2f870 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -0,0 +1,761 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.broker; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Set; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.RefCountMessage; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.encode.BodyType; +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.apache.qpid.proton.util.TLSEncoder; + +// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format +public class AMQPMessage extends RefCountMessage { + + final long messageFormat; + private ProtonProtocolManager protocolManager; + ByteBuf data; + boolean bufferValid; + byte type; + long messageID; + String address; + MessageImpl protonMessage; + private long expiration = 0; + // this can be used to encode the header again and the rest of the message buffer + private int headerEnd = -1; + private Header _header; + private DeliveryAnnotations _deliveryAnnotations; + private MessageAnnotations _messageAnnotations; + private Properties _properties; + private ApplicationProperties applicationProperties; + + public AMQPMessage(long messageFormat, byte[] data, ProtonProtocolManager protocolManager) { + this.protocolManager = protocolManager; + this.data = Unpooled.wrappedBuffer(data); + this.messageFormat = messageFormat; + this.bufferValid = true; + + } + + /** for persistence reload */ + public AMQPMessage(long messageFormat) { + this.messageFormat = messageFormat; + this.bufferValid = false; + + } + + public AMQPMessage(long messageFormat, Message message, ProtonProtocolManager protocolManager) { + this.protocolManager = protocolManager; + this.protonMessage = (MessageImpl)message; + this.messageFormat = messageFormat; + + } + + public AMQPMessage(Message message, ProtonProtocolManager protocolManager) { + this(0, message, protocolManager); + } + + public MessageImpl getProtonMessage() { + if (protonMessage == null) { + protonMessage = (MessageImpl) Message.Factory.create(); + + if (data != null) { + data.readerIndex(0); + protonMessage.decode(data.nioBuffer()); + this._header = protonMessage.getHeader(); + protonMessage.setHeader(null); + } + } + + return protonMessage; + } + + private void initalizeObjects() { + if (protonMessage == null) { + if (data == null) { + this.headerEnd = -1; + _header = new Header(); + _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); + _properties = new Properties(); + this.applicationProperties = new ApplicationProperties(new HashMap<>()); + this.protonMessage = (MessageImpl)Message.Factory.create(); + this.protonMessage.setApplicationProperties(applicationProperties); + this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations); + } + } + } + + private ApplicationProperties getApplicationProperties() { + if (applicationProperties == null) { + if (data != null) { + partialDecode(data.nioBuffer(), true); + } else { + initalizeObjects(); + } + } + + return applicationProperties; + } + + public Header getHeader() { + if (_header == null) { + if (data == null) { + initalizeObjects(); + } else { + partialDecode(this.data.nioBuffer(), false); + } + } + + return _header; + } + + public Properties getProperties() { + if (_properties == null) { + if (data == null) { + initalizeObjects(); + } else { + partialDecode(this.data.nioBuffer(), true); + } + } + + return _properties; + } + + @Override + public Persister<org.apache.activemq.artemis.api.core.Message> getPersister() { + return AMQPMessagePersister.getInstance(); + } + + private synchronized void partialDecode(ByteBuffer buffer, boolean readApplicationProperties) { + DecoderImpl decoder = TLSEncoder.getDecoder(); + decoder.setByteBuffer(buffer); + buffer.position(0); + + _header = null; + _deliveryAnnotations = null; + _messageAnnotations = null; + _properties = null; + applicationProperties = null; + Section section = null; + + try { + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } + + if (section instanceof Header) { + headerEnd = buffer.position(); + _header = (Header) section; + + if (!readApplicationProperties) { + return; + } + + if (buffer.hasRemaining() && readApplicationProperties) { + section = (Section) decoder.readObject(); + } else { + section = null; + } + } + + if (!readApplicationProperties) { + return; + } + if (section instanceof DeliveryAnnotations) { + _deliveryAnnotations = (DeliveryAnnotations) section; + + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } else { + section = null; + } + + } + if (section instanceof MessageAnnotations) { + _messageAnnotations = (MessageAnnotations) section; + + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } else { + section = null; + } + + } + if (section instanceof Properties) { + _properties = (Properties) section; + + if (_header.getTtl() != null) { + this.expiration = System.currentTimeMillis() + _header.getTtl().intValue(); + } + + if (buffer.hasRemaining()) { + section = (Section) decoder.readObject(); + } else { + section = null; + } + + } + if (section instanceof ApplicationProperties) { + applicationProperties = (ApplicationProperties) section; + } + } finally { + decoder.setByteBuffer(null); + } + } + + public long getMessageFormat() { + return messageFormat; + } + + public int getLength() { + return data.array().length; + } + + public byte[] getArray() { + return data.array(); + } + + @Override + public void messageChanged() { + bufferValid = false; + this.data = null; + } + + // TODO-now this only make sense on Core + @Override + public ActiveMQBuffer getBodyBuffer() { + return null; + } + + // TODO-now this only make sense on Core + @Override + public ActiveMQBuffer getReadOnlyBodyBuffer() { + return null; + } + + // TODO: Refactor Large message + @Override + public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { + return null; + } + + @Override + public byte getType() { + // TODO-now: what to do here? + return type; + } + + @Override + public AMQPMessage setType(byte type) { + this.type = type; + return this; + } + + @Override + public boolean isLargeMessage() { + return false; + } + + @Override + public ByteBuf getBuffer() { + if (data == null) { + return null; + } else { + return Unpooled.wrappedBuffer(data); + } + } + + @Override + public AMQPMessage setBuffer(ByteBuf buffer) { + this.data = null; + return this; + } + + @Override + public org.apache.activemq.artemis.api.core.Message copy() { + // TODO-now: what to do with this? + AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array(), protocolManager); + return newEncode; + } + + @Override + public org.apache.activemq.artemis.api.core.Message copy(long newID) { + return copy().setMessageID(newID); + } + + @Override + public long getMessageID() { + return messageID; + } + + @Override + public org.apache.activemq.artemis.api.core.Message setMessageID(long id) { + this.messageID = id; + return this; + } + + @Override + public long getExpiration() { + return expiration; + } + + @Override + public AMQPMessage setExpiration(long expiration) { + this.expiration = expiration; + return this; + } + + @Override + public Object getUserID() { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message setUserID(Object userID) { + return null; + } + + @Override + public void copyHeadersAndProperties(org.apache.activemq.artemis.api.core.Message msg) { + + } + + @Override + public boolean isDurable() { + if (getHeader() != null) { + return getHeader().getDurable().booleanValue(); + } else { + return false; + } + } + + @Override + public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) { + return null; + } + + @Override + public Object getProtocol() { + return protocolManager; + } + + @Override + public AMQPMessage setProtocol(Object protocol) { + this.protocolManager = (ProtonProtocolManager)protocol; + return this; + } + + @Override + public Object getBody() { + return null; + } + + @Override + public BodyType getBodyType() { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message setBody(BodyType type, Object body) { + return null; + } + + @Override + public String getAddress() { + if (address == null) { + Properties properties = getProtonMessage().getProperties(); + if (properties != null) { + return properties.getTo(); + } else { + return null; + } + } else { + return address; + } + } + + @Override + public AMQPMessage setAddress(SimpleString address) { + return setAddress(address.toString()); + } + + @Override + public AMQPMessage setAddress(String address) { + this.address = address; + return this; + } + + @Override + public SimpleString getAddressSimpleString() { + return SimpleString.toSimpleString(getAddress()); + } + + @Override + public long getTimestamp() { + return 0; + } + + @Override + public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) { + return null; + } + + @Override + public byte getPriority() { + return 0; + } + + @Override + public org.apache.activemq.artemis.api.core.Message setPriority(byte priority) { + return null; + } + + @Override + public void receiveBuffer(ByteBuf buffer) { + + } + + private synchronized void checkBuffer() { + if (!bufferValid) { + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500); + try { + getProtonMessage().encode(new NettyWritable(buffer)); + byte[] bytes = new byte[buffer.writerIndex()]; + buffer.readBytes(bytes); + this.data = Unpooled.wrappedBuffer(bytes); + } finally { + buffer.release(); + } + } + } + + @Override + public void sendBuffer(ByteBuf buffer, int deliveryCount) { + // TODO: do I need to change the Header with deliveryCount? + // I would send a new instance of Header with a new delivery count, and only send partial of the buffer + // previously received + checkBuffer(); + buffer.writeBytes(data); + } + + @Override + public org.apache.activemq.artemis.api.core.Message putBooleanProperty(String key, boolean value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putByteProperty(String key, byte value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putBytesProperty(String key, byte[] value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putShortProperty(String key, short value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putCharProperty(String key, char value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putIntProperty(String key, int value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putLongProperty(String key, long value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putFloatProperty(String key, float value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putDoubleProperty(String key, double value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putBooleanProperty(SimpleString key, boolean value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putByteProperty(SimpleString key, byte value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putBytesProperty(SimpleString key, byte[] value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putShortProperty(SimpleString key, short value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putCharProperty(SimpleString key, char value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putIntProperty(SimpleString key, int value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putLongProperty(SimpleString key, long value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putFloatProperty(SimpleString key, float value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putDoubleProperty(SimpleString key, double value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putStringProperty(String key, String value) { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putObjectProperty(String key, + Object value) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putObjectProperty(SimpleString key, + Object value) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Object removeProperty(String key) { + return null; + } + + @Override + public boolean containsProperty(String key) { + return false; + } + + @Override + public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Long getLongProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Object getObjectProperty(String key) { + return null; + } + + @Override + public Short getShortProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public String getStringProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public byte[] getBytesProperty(String key) throws ActiveMQPropertyConversionException { + return new byte[0]; + } + + @Override + public Object removeProperty(SimpleString key) { + return null; + } + + @Override + public boolean containsProperty(SimpleString key) { + return false; + } + + @Override + public Boolean getBooleanProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Byte getByteProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Double getDoubleProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Integer getIntProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Long getLongProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Object getObjectProperty(SimpleString key) { + return null; + } + + @Override + public Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public String getStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public SimpleString getSimpleStringProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return null; + } + + @Override + public byte[] getBytesProperty(SimpleString key) throws ActiveMQPropertyConversionException { + return new byte[0]; + } + + @Override + public org.apache.activemq.artemis.api.core.Message putStringProperty(SimpleString key, SimpleString value) { + return null; + } + + @Override + public int getEncodeSize() { + return 0; + } + + @Override + public Set<SimpleString> getPropertyNames() { + return Collections.emptySet(); + } + + @Override + public int getMemoryEstimate() { + return 0; + } + + @Override + public org.apache.activemq.artemis.api.core.Message toCore() { + MessageImpl protonMessage = getProtonMessage(); + return null; + } + + @Override + public int getPersistSize() { + checkBuffer(); + return data.array().length + DataConstants.SIZE_INT; + } + + @Override + public void persist(ActiveMQBuffer targetRecord) { + checkBuffer(); + targetRecord.writeInt(data.array().length); + targetRecord.writeBytes(data.array()); + } + + @Override + public void reloadPersistence(ActiveMQBuffer record) { + int size = record.readInt(); + byte[] recordArray = new byte[size]; + record.readBytes(recordArray); + this.data = Unpooled.wrappedBuffer(recordArray); + this.bufferValid = true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java new file mode 100644 index 0000000..3b5bdda --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersister.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.broker; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; +import org.apache.activemq.artemis.utils.DataConstants; + +public class AMQPMessagePersister extends MessagePersister { + + public static AMQPMessagePersister theInstance = new AMQPMessagePersister(); + + public static AMQPMessagePersister getInstance() { + return theInstance; + } + + private AMQPMessagePersister() { + } + + @Override + protected byte getID() { + return ProtonProtocolManagerFactory.ID; + } + + @Override + public int getEncodeSize(Message record) { + return DataConstants.SIZE_BYTE + record.getPersistSize() + + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG; + } + + + /** Sub classes must add the first short as the protocol-id */ + @Override + public void encode(ActiveMQBuffer buffer, Message record) { + super.encode(buffer, record); + AMQPMessage msgEncode = (AMQPMessage)record; + buffer.writeLong(record.getMessageID()); + buffer.writeLong(msgEncode.getMessageFormat()); + buffer.writeNullableSimpleString(record.getAddressSimpleString()); + record.persist(buffer); + } + + + @Override + public Message decode(ActiveMQBuffer buffer, Message record) { + long id = buffer.readLong(); + long format = buffer.readLong(); + SimpleString address = buffer.readNullableSimpleString(); + record = new AMQPMessage(format); + record.reloadPersistence(buffer); + record.setMessageID(id); + if (address != null) { + record.setAddress(address); + } + return record; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 18c6b05..0b02838 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; @@ -34,14 +35,12 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; -import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; @@ -69,7 +68,6 @@ import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; -import io.netty.buffer.ByteBuf; import org.jboss.logging.Logger; public class AMQPSessionCallback implements SessionCallback { @@ -298,11 +296,11 @@ public class AMQPSessionCallback implements SessionCallback { } } - public long encodeMessage(Object message, int deliveryCount, WritableBuffer buffer) throws Exception { + public long encodeMessage(Message message, int deliveryCount, WritableBuffer buffer) throws Exception { ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter(); // The Proton variant accepts a WritableBuffer to allow for a faster more direct encode. - return (long) converter.outbound((ServerMessage) message, deliveryCount, buffer); + return (long) converter.outbound(message, deliveryCount, buffer); } public String tempQueueName() { @@ -321,22 +319,22 @@ public class AMQPSessionCallback implements SessionCallback { } } - public void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception { + public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception { if (transaction == null) { transaction = serverSession.getCurrentTransaction(); } recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, ((ServerMessage) message).getMessageID()); + ((ServerConsumer) brokerConsumer).individualAcknowledge(transaction, message.getMessageID()); } finally { resetContext(); } } - public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception { + public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception { recoverContext(); try { - ((ServerConsumer) brokerConsumer).individualCancel(((ServerMessage) message).getMessageID(), updateCounts); + ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts); } finally { resetContext(); } @@ -351,11 +349,8 @@ public class AMQPSessionCallback implements SessionCallback { final Delivery delivery, String address, int messageFormat, - ByteBuf messageEncoded) throws Exception { - EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex()); - - ServerMessage message = manager.getConverter().inbound(encodedMessage); - //use the address on the receiver if not null, if null let's hope it was set correctly on the message + byte[] data) throws Exception { + AMQPMessage message = new AMQPMessage(messageFormat, data, manager); if (address != null) { message.setAddress(new SimpleString(address)); } else { @@ -372,7 +367,7 @@ public class AMQPSessionCallback implements SessionCallback { recoverContext(); - PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress()); + PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); if (store.isRejectingMessages()) { // We drop pre-settled messages (and abort any associated Tx) if (delivery.remotelySettled()) { @@ -401,7 +396,7 @@ public class AMQPSessionCallback implements SessionCallback { } private void serverSend(final Transaction transaction, - final ServerMessage message, + final Message message, final Delivery delivery, final Receiver receiver) throws Exception { try { @@ -416,8 +411,8 @@ public class AMQPSessionCallback implements SessionCallback { synchronized (connection.getLock()) { delivery.disposition(Accepted.getInstance()); delivery.settle(); - connection.flush(); } + connection.flush(true); } @Override @@ -492,7 +487,7 @@ public class AMQPSessionCallback implements SessionCallback { } @Override - public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString()); @@ -512,7 +507,7 @@ public class AMQPSessionCallback implements SessionCallback { @Override public int sendLargeMessage(MessageReference ref, - ServerMessage message, + Message message, ServerConsumer consumer, long bodySize, int deliveryCount) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java index bef8ef0..98ec228 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java @@ -22,6 +22,8 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -32,6 +34,8 @@ import org.osgi.service.component.annotations.Component; @Component(service = ProtocolManagerFactory.class) public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> { + public static final byte ID = 2; + private static final String AMQP_PROTOCOL_NAME = "AMQP"; private static final String MODULE_NAME = "artemis-amqp-protocol"; @@ -39,6 +43,16 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME}; @Override + public byte getStoreID() { + return ID; + } + + @Override + public Persister<Message> getPersister() { + return AMQPMessagePersister.getInstance(); + } + + @Override public ProtocolManager createProtocolManager(ActiveMQServer server, final Map<String, Object> parameters, List<BaseInterceptor> incomingInterceptors,