Author: rgodfrey
Date: Fri Jan 25 15:38:45 2013
New Revision: 1438556

URL: http://svn.apache.org/viewvc?rev=1438556&view=rev
Log:
QPID-4550 : AMQP 1.0 Persistent Messages cause failure on restart

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java?rev=1438556&r1=1438555&r2=1438556&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/AMQFrame.java
 Fri Jan 25 15:38:45 2013
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.amqp_1_0.framing;
 
+import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.FrameBody;
 
 import java.nio.ByteBuffer;
@@ -65,4 +66,11 @@ public abstract class AMQFrame<T>
         return _frameBody;
     }
 
+    @Override
+    public String toString()
+    {
+        return "AMQFrame{" +
+               "frameBody=" + _frameBody +
+               '}';
+    }
 }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1438556&r1=1438555&r2=1438556&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 Fri Jan 25 15:38:45 2013
@@ -1534,7 +1534,8 @@ public abstract class AbstractBDBMessage
             else
             {
                 ByteBuffer buf = ByteBuffer.allocate(size);
-                getContent(offsetInMessage, buf);
+                int length = getContent(offsetInMessage, buf);
+                buf.limit(length);
                 buf.position(0);
                 return  buf;
             }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java?rev=1438556&r1=1438555&r2=1438556&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
 Fri Jan 25 15:38:45 2013
@@ -342,6 +342,14 @@ public class MessageMetaData_1_0 impleme
     {
         private final AMQPDescribedTypeRegistry _typeRegistry = 
AMQPDescribedTypeRegistry.newInstance();
 
+        private MetaDataFactory()
+        {
+            _typeRegistry.registerTransportLayer();
+            _typeRegistry.registerMessagingLayer();
+            _typeRegistry.registerTransactionLayer();
+            _typeRegistry.registerSecurityLayer();
+        }
+
         public MessageMetaData_1_0 createMetaData(ByteBuffer buf)
         {
             ValueHandler valueHandler = new ValueHandler(_typeRegistry);
@@ -354,7 +362,8 @@ public class MessageMetaData_1_0 impleme
                 try
                 {
                     ByteBuffer encodedBuf = buf.duplicate();
-                    sections.add((Section) valueHandler.parse(buf));
+                    Object parse = valueHandler.parse(buf);
+                    sections.add((Section) parse);
                     encodedBuf.limit(buf.position());
                     encodedSections.add(encodedBuf);
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1438556&r1=1438555&r2=1438556&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 Fri Jan 25 15:38:45 2013
@@ -23,7 +23,9 @@ package org.apache.qpid.server.protocol.
 
 import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.message.MessageMetaData_1_0;
@@ -33,11 +35,45 @@ import org.apache.qpid.server.store.Stor
 
 public class Message_1_0 implements ServerMessage, InboundMessage
 {
+
+
+    private static final AtomicIntegerFieldUpdater<Message_1_0> 
_refCountUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, 
"_referenceCount");
+
+    private volatile int _referenceCount = 0;
+
     private final StoredMessage<MessageMetaData_1_0> _storedMessage;
     private List<ByteBuffer> _fragments;
     private WeakReference<Session_1_0> _session;
 
 
+    public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
+    {
+        _storedMessage = storedMessage;
+        _session = null;
+        _fragments = restoreFragments(storedMessage);
+    }
+
+    private static List<ByteBuffer> 
restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
+    {
+        ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
+        final int FRAGMENT_SIZE = 2048;
+        int offset = 0;
+        ByteBuffer b;
+        do
+        {
+
+            b = storedMessage.getContent(offset,FRAGMENT_SIZE);
+            if(b.hasRemaining())
+            {
+                fragments.add(b);
+                offset+= b.remaining();
+            }
+        }
+        while(b.hasRemaining());
+        return fragments;
+    }
+
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
                        final List<ByteBuffer> fragments,
                        final Session_1_0 session)
@@ -142,7 +178,61 @@ public class Message_1_0 implements Serv
 
     public Session_1_0 getSession()
     {
-        return _session.get();
+        return _session == null ? null : _session.get();
+    }
+
+
+    public boolean incrementReference()
+    {
+        if(_refCountUpdater.incrementAndGet(this) <= 0)
+        {
+            _refCountUpdater.decrementAndGet(this);
+            return false;
+        }
+        else
+        {
+            return true;
+        }
+    }
+
+    /**
+     * Threadsafe. This will decrement the reference count and when it reaches 
zero will remove the message from the
+     * message store.
+     *
+     *
+     * @throws org.apache.qpid.server.queue.MessageCleanupException when an 
attempt was made to remove the message from the message store and that
+     *                                 failed
+     */
+    public void decrementReference()
+    {
+        int count = _refCountUpdater.decrementAndGet(this);
+
+        // note that the operation of decrementing the reference count and 
then removing the message does not
+        // have to be atomic since the ref count starts at 1 and the exchange 
itself decrements that after
+        // the message has been passed to all queues. i.e. we are
+        // not relying on the all the increments having taken place before the 
delivery manager decrements.
+        if (count == 0)
+        {
+            // set the reference count way below 0 so that we can detect that 
the message has been deleted
+            // this is to guard against the message being spontaneously 
recreated (from the mgmt console)
+            // by copying from other queues at the same time as it is being 
removed.
+            _refCountUpdater.set(this,Integer.MIN_VALUE/2);
+
+            // must check if the handle is null since there may be cases where 
we decide to throw away a message
+            // and the handle has not yet been constructed
+            if (_storedMessage != null)
+            {
+                _storedMessage.remove();
+            }
+        }
+        else
+        {
+            if (count < 0)
+            {
+                throw new RuntimeException("Reference count for message id " + 
getMessageNumber()
+                                                  + " has gone below 0.");
+            }
+        }
     }
 
     public static class Reference extends MessageReference<Message_1_0>
@@ -154,13 +244,13 @@ public class Message_1_0 implements Serv
 
         protected void onReference(Message_1_0 message)
         {
-
+            message.incrementReference();
         }
 
         protected void onRelease(Message_1_0 message)
         {
-
+            message.decrementReference();
         }
 
-}
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1438556&r1=1438555&r2=1438556&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
 Fri Jan 25 15:38:45 2013
@@ -1793,8 +1793,9 @@ public class DerbyMessageStore implement
         public ByteBuffer getContent(int offsetInMessage, int size)
         {
             ByteBuffer buf = ByteBuffer.allocate(size);
-            getContent(offsetInMessage, buf);
+            int length = getContent(offsetInMessage, buf);
             buf.position(0);
+            buf.limit(length);
             return  buf;
         }
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1438556&r1=1438555&r2=1438556&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
 Fri Jan 25 15:38:45 2013
@@ -41,8 +41,10 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -75,7 +77,7 @@ public class VirtualHostConfigRecoveryHa
     private final VirtualHost _virtualHost;
 
     private final Map<String, Integer> _queueRecoveries = new TreeMap<String, 
Integer>();
-    private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = 
new HashMap<Long, AbstractServerMessageImpl>();
+    private final Map<Long, ServerMessage> _recoveredMessages = new 
HashMap<Long, ServerMessage>();
     private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, 
StoredMessage>();
 
     private MessageStoreLogSubject _logSubject;
@@ -167,7 +169,7 @@ public class VirtualHostConfigRecoveryHa
 
     public void message(StoredMessage message)
     {
-        AbstractServerMessageImpl serverMessage;
+        ServerMessage serverMessage;
         switch(message.getMetaData().getType())
         {
             case META_DATA_0_8:
@@ -176,6 +178,9 @@ public class VirtualHostConfigRecoveryHa
             case META_DATA_0_10:
                 serverMessage = new MessageTransferMessage(message, null);
                 break;
+            case META_DATA_1_0:
+                serverMessage = new Message_1_0(message);
+                break;
             default:
                 throw new RuntimeException("Unknown message type retrieved 
from store " + message.getMetaData().getClass());
         }
@@ -206,12 +211,13 @@ public class VirtualHostConfigRecoveryHa
             if(queue != null)
             {
                 final long messageId = record.getMessage().getMessageNumber();
-                final AbstractServerMessageImpl message = 
_recoveredMessages.get(messageId);
+                final ServerMessage message = 
_recoveredMessages.get(messageId);
                 _unusedMessages.remove(messageId);
 
                 if(message != null)
                 {
-                    message.incrementReference();
+                    final MessageReference ref = message.newReference();
+
 
                     branch.enqueue(queue,message);
 
@@ -224,7 +230,7 @@ public class VirtualHostConfigRecoveryHa
                             {
 
                                 queue.enqueue(message, true, null);
-                                message.decrementReference();
+                                ref.release();
                             }
                             catch (AMQException e)
                             {
@@ -236,7 +242,7 @@ public class VirtualHostConfigRecoveryHa
 
                         public void onRollback()
                         {
-                            message.decrementReference();
+                            ref.release();
                         }
                     });
                 }
@@ -265,7 +271,7 @@ public class VirtualHostConfigRecoveryHa
             if(queue != null)
             {
                 final long messageId = record.getMessage().getMessageNumber();
-                final AbstractServerMessageImpl message = 
_recoveredMessages.get(messageId);
+                final ServerMessage message = 
_recoveredMessages.get(messageId);
                 _unusedMessages.remove(messageId);
 
                 if(message != null)

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java?rev=1438556&r1=1438555&r2=1438556&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
 Fri Jan 25 15:38:45 2013
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
@@ -86,10 +87,12 @@ public class ReferenceCountingTest exten
 
         AMQMessage message = new AMQMessage(storedMessage);
 
-        message.incrementReference();
+        MessageReference ref = message.newReference();
 
         assertEquals(1, _store.getMessageCount());
-        message.decrementReference();
+
+        ref.release();
+
         assertEquals(0, _store.getMessageCount());
     }
 
@@ -142,13 +145,13 @@ public class ReferenceCountingTest exten
         AMQMessage message = new AMQMessage(storedMessage);
 
 
-        message.incrementReference();
+        MessageReference ref = message.newReference();
         // we call routing complete to set up the handle
      //   message.routingComplete(_store, _storeContext, new 
MessageHandleFactory());
 
         assertEquals(1, _store.getMessageCount());
-        message.incrementReference();
-        message.decrementReference();
+        MessageReference ref2 = message.newReference();
+        ref.release();
         assertEquals(1, _store.getMessageCount());
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to