Author: rgodfrey Date: Fri Jan 25 15:38:45 2013 New Revision: 1438556 URL: http://svn.apache.org/viewvc?rev=1438556&view=rev Log: QPID-4550 : AMQP 1.0 Persistent Messages cause failure on restart
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java?rev=1438556&r1=1438555&r2=1438556&view=diff ============================================================================== --- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java (original) +++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java Fri Jan 25 15:38:45 2013 @@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.framing; +import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.FrameBody; import java.nio.ByteBuffer; @@ -65,4 +66,11 @@ public abstract class AMQFrame<T> return _frameBody; } + @Override + public String toString() + { + return "AMQFrame{" + + "frameBody=" + _frameBody + + '}'; + } } Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1438556&r1=1438555&r2=1438556&view=diff ============================================================================== --- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original) +++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Fri Jan 25 15:38:45 2013 @@ -1534,7 +1534,8 @@ public abstract class AbstractBDBMessage else { ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); + int length = getContent(offsetInMessage, buf); + buf.limit(length); buf.position(0); return buf; } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java?rev=1438556&r1=1438555&r2=1438556&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java Fri Jan 25 15:38:45 2013 @@ -342,6 +342,14 @@ public class MessageMetaData_1_0 impleme { private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance(); + private MetaDataFactory() + { + _typeRegistry.registerTransportLayer(); + _typeRegistry.registerMessagingLayer(); + _typeRegistry.registerTransactionLayer(); + _typeRegistry.registerSecurityLayer(); + } + public MessageMetaData_1_0 createMetaData(ByteBuffer buf) { ValueHandler valueHandler = new ValueHandler(_typeRegistry); @@ -354,7 +362,8 @@ public class MessageMetaData_1_0 impleme try { ByteBuffer encodedBuf = buf.duplicate(); - sections.add((Section) valueHandler.parse(buf)); + Object parse = valueHandler.parse(buf); + sections.add((Section) parse); encodedBuf.limit(buf.position()); encodedSections.add(encodedBuf); Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1438556&r1=1438555&r2=1438556&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Fri Jan 25 15:38:45 2013 @@ -23,7 +23,9 @@ package org.apache.qpid.server.protocol. import java.lang.ref.WeakReference; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageMetaData_1_0; @@ -33,11 +35,45 @@ import org.apache.qpid.server.store.Stor public class Message_1_0 implements ServerMessage, InboundMessage { + + + private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount"); + + private volatile int _referenceCount = 0; + private final StoredMessage<MessageMetaData_1_0> _storedMessage; private List<ByteBuffer> _fragments; private WeakReference<Session_1_0> _session; + public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage) + { + _storedMessage = storedMessage; + _session = null; + _fragments = restoreFragments(storedMessage); + } + + private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage) + { + ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>(); + final int FRAGMENT_SIZE = 2048; + int offset = 0; + ByteBuffer b; + do + { + + b = storedMessage.getContent(offset,FRAGMENT_SIZE); + if(b.hasRemaining()) + { + fragments.add(b); + offset+= b.remaining(); + } + } + while(b.hasRemaining()); + return fragments; + } + public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage, final List<ByteBuffer> fragments, final Session_1_0 session) @@ -142,7 +178,61 @@ public class Message_1_0 implements Serv public Session_1_0 getSession() { - return _session.get(); + return _session == null ? null : _session.get(); + } + + + public boolean incrementReference() + { + if(_refCountUpdater.incrementAndGet(this) <= 0) + { + _refCountUpdater.decrementAndGet(this); + return false; + } + else + { + return true; + } + } + + /** + * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the + * message store. + * + * + * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that + * failed + */ + public void decrementReference() + { + int count = _refCountUpdater.decrementAndGet(this); + + // note that the operation of decrementing the reference count and then removing the message does not + // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after + // the message has been passed to all queues. i.e. we are + // not relying on the all the increments having taken place before the delivery manager decrements. + if (count == 0) + { + // set the reference count way below 0 so that we can detect that the message has been deleted + // this is to guard against the message being spontaneously recreated (from the mgmt console) + // by copying from other queues at the same time as it is being removed. + _refCountUpdater.set(this,Integer.MIN_VALUE/2); + + // must check if the handle is null since there may be cases where we decide to throw away a message + // and the handle has not yet been constructed + if (_storedMessage != null) + { + _storedMessage.remove(); + } + } + else + { + if (count < 0) + { + throw new RuntimeException("Reference count for message id " + getMessageNumber() + + " has gone below 0."); + } + } } public static class Reference extends MessageReference<Message_1_0> @@ -154,13 +244,13 @@ public class Message_1_0 implements Serv protected void onReference(Message_1_0 message) { - + message.incrementReference(); } protected void onRelease(Message_1_0 message) { - + message.decrementReference(); } -} + } } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1438556&r1=1438555&r2=1438556&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Fri Jan 25 15:38:45 2013 @@ -1793,8 +1793,9 @@ public class DerbyMessageStore implement public ByteBuffer getContent(int offsetInMessage, int size) { ByteBuffer buf = ByteBuffer.allocate(size); - getContent(offsetInMessage, buf); + int length = getContent(offsetInMessage, buf); buf.position(0); + buf.limit(length); return buf; } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1438556&r1=1438555&r2=1438556&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Fri Jan 25 15:38:45 2013 @@ -41,8 +41,10 @@ import org.apache.qpid.server.logging.su import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; @@ -75,7 +77,7 @@ public class VirtualHostConfigRecoveryHa private final VirtualHost _virtualHost; private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>(); + private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>(); private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>(); private MessageStoreLogSubject _logSubject; @@ -167,7 +169,7 @@ public class VirtualHostConfigRecoveryHa public void message(StoredMessage message) { - AbstractServerMessageImpl serverMessage; + ServerMessage serverMessage; switch(message.getMetaData().getType()) { case META_DATA_0_8: @@ -176,6 +178,9 @@ public class VirtualHostConfigRecoveryHa case META_DATA_0_10: serverMessage = new MessageTransferMessage(message, null); break; + case META_DATA_1_0: + serverMessage = new Message_1_0(message); + break; default: throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); } @@ -206,12 +211,13 @@ public class VirtualHostConfigRecoveryHa if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); - final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + final ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) { - message.incrementReference(); + final MessageReference ref = message.newReference(); + branch.enqueue(queue,message); @@ -224,7 +230,7 @@ public class VirtualHostConfigRecoveryHa { queue.enqueue(message, true, null); - message.decrementReference(); + ref.release(); } catch (AMQException e) { @@ -236,7 +242,7 @@ public class VirtualHostConfigRecoveryHa public void onRollback() { - message.decrementReference(); + ref.release(); } }); } @@ -265,7 +271,7 @@ public class VirtualHostConfigRecoveryHa if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); - final AbstractServerMessageImpl message = _recoveredMessages.get(messageId); + final ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); if(message != null) Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java?rev=1438556&r1=1438555&r2=1438556&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java Fri Jan 25 15:38:45 2013 @@ -27,6 +27,7 @@ import org.apache.qpid.framing.ContentHe import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.test.utils.QpidTestCase; /** @@ -86,10 +87,12 @@ public class ReferenceCountingTest exten AMQMessage message = new AMQMessage(storedMessage); - message.incrementReference(); + MessageReference ref = message.newReference(); assertEquals(1, _store.getMessageCount()); - message.decrementReference(); + + ref.release(); + assertEquals(0, _store.getMessageCount()); } @@ -142,13 +145,13 @@ public class ReferenceCountingTest exten AMQMessage message = new AMQMessage(storedMessage); - message.incrementReference(); + MessageReference ref = message.newReference(); // we call routing complete to set up the handle // message.routingComplete(_store, _storeContext, new MessageHandleFactory()); assertEquals(1, _store.getMessageCount()); - message.incrementReference(); - message.decrementReference(); + MessageReference ref2 = message.newReference(); + ref.release(); assertEquals(1, _store.getMessageCount()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org