Author: robbie
Date: Tue Nov  1 11:18:06 2011
New Revision: 1195929

URL: http://svn.apache.org/viewvc?rev=1195929&view=rev
Log:
QPID-3570: update MessageTransferMessage and TransferMessageReference to use 
the reference counting system, helping ensure that persisted 0-10 message 
[meta]data is removed from the store when dequeued from all queues instead of 
just being orphaned for cleanup during store recovery

Added:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=1195929&r1=1195928&r2=1195929&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
 Tue Nov  1 11:18:06 2011
@@ -30,20 +30,17 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.queue.AMQQueue;
 
 
-import java.util.concurrent.atomic.AtomicInteger;
 import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 
 /**
  * A deliverable message.
  */
-public class AMQMessage implements ServerMessage
+public class AMQMessage extends AbstractServerMessageImpl
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
 
-    private final AtomicInteger _referenceCount = new AtomicInteger(0);
-
     /** Flag to indicate that this message requires 'immediate' delivery. */
 
     private static final byte IMMEDIATE = 0x01;
@@ -76,6 +73,8 @@ public class AMQMessage implements Serve
     
     public AMQMessage(StoredMessage<MessageMetaData> handle, 
WeakReference<AMQChannel> channelRef)
     {
+        super(handle);
+
         _handle = handle;
         final MessageMetaData metaData = handle.getMetaData();
         _size = metaData.getContentSize();
@@ -89,12 +88,6 @@ public class AMQMessage implements Serve
         _channelRef = channelRef;
     }
 
-
-    public String debugIdentity()
-    {
-        return "(HC:" + System.identityHashCode(this) + " ID:" + 
getMessageId() + " Ref:" + _referenceCount.get() + ")";
-    }
-
     public void setExpiration(final long expiration)
     {
 
@@ -102,11 +95,6 @@ public class AMQMessage implements Serve
 
     }
 
-    public boolean isReferenced()
-    {
-        return _referenceCount.get() > 0;
-    }
-
     public MessageMetaData getMessageMetaData()
     {
         return _handle.getMetaData();
@@ -117,88 +105,12 @@ public class AMQMessage implements Serve
         return getMessageMetaData().getContentHeaderBody();
     }
 
-
-
     public Long getMessageId()
     {
         return _handle.getMessageNumber();
     }
 
     /**
-     * Creates a long-lived reference to this message, and increments the 
count of such references, as an atomic
-     * operation.
-     */
-    public AMQMessage takeReference()
-    {
-        incrementReference(); // _referenceCount.incrementAndGet();
-
-        return this;
-    }
-
-    public boolean incrementReference()
-    {
-        return incrementReference(1);
-    }
-
-    /* Threadsafe. Increment the reference count on the message. */
-    public boolean incrementReference(int count)
-    {
-
-        if(_referenceCount.addAndGet(count) <= 0)
-        {
-            _referenceCount.addAndGet(-count);
-            return false;
-        }
-        else
-        {
-            return true;
-        }
-
-    }
-
-    /**
-     * Threadsafe. This will decrement the reference count and when it reaches 
zero will remove the message from the
-     * message store.
-     *
-     *
-     * @throws org.apache.qpid.server.queue.MessageCleanupException when an 
attempt was made to remove the message from the message store and that
-     *                                 failed
-     */
-    public void decrementReference()
-    {
-        int count = _referenceCount.decrementAndGet();
-
-        // note that the operation of decrementing the reference count and 
then removing the message does not
-        // have to be atomic since the ref count starts at 1 and the exchange 
itself decrements that after
-        // the message has been passed to all queues. i.e. we are
-        // not relying on the all the increments having taken place before the 
delivery manager decrements.
-        if (count == 0)
-        {
-            // set the reference count way below 0 so that we can detect that 
the message has been deleted
-            // this is to guard against the message being spontaneously 
recreated (from the mgmt console)
-            // by copying from other queues at the same time as it is being 
removed.
-            _referenceCount.set(Integer.MIN_VALUE/2);
-
-            // must check if the handle is null since there may be cases where 
we decide to throw away a message
-            // and the handle has not yet been constructed
-            if (_handle != null)
-            {
-                _handle.remove();
-
-            }
-        }
-        else
-        {
-            if (count < 0)
-            {
-                throw new RuntimeException("Reference count for message id " + 
debugIdentity()
-                                                  + " has gone below 0.");
-            }
-        }
-    }
-
-
-    /**
      * Called selectors to determin if the message has already been sent
      *
      * @return _deliveredToConsumer
@@ -323,10 +235,7 @@ public class AMQMessage implements Serve
 
     public String toString()
     {
-        // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref 
count: " + _referenceCount + "; taken : " +
-        // _taken + " by :" + _takenBySubcription;
-
-        return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref 
count: " + _referenceCount;
+        return "Message[" + debugIdentity() + "]: " + getMessageId() + "; ref 
count: " + getReferenceCount();
     }
 
     public int getContent(ByteBuffer buf, int offset)

Added: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1195929&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 (added)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 Tue Nov  1 11:18:06 2011
@@ -0,0 +1,84 @@
+package org.apache.qpid.server.message;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.store.StoredMessage;
+
+public abstract class AbstractServerMessageImpl implements ServerMessage
+{
+    private final AtomicInteger _referenceCount = new AtomicInteger(0);
+    private final StoredMessage<?> _handle;
+
+    public AbstractServerMessageImpl(StoredMessage<?> handle)
+    {
+        _handle = handle;
+    }
+
+    public boolean incrementReference()
+    {
+        return incrementReference(1);
+    }
+
+    public boolean incrementReference(int count)
+    {
+        if(_referenceCount.addAndGet(count) <= 0)
+        {
+            _referenceCount.addAndGet(-count);
+            return false;
+        }
+        else
+        {
+            return true;
+        }
+    }
+
+    /**
+     * Threadsafe. This will decrement the reference count and when it reaches 
zero will remove the message from the
+     * message store.
+     *
+     *
+     * @throws org.apache.qpid.server.queue.MessageCleanupException when an 
attempt was made to remove the message from the message store and that
+     *                                 failed
+     */
+    public void decrementReference()
+    {
+        int count = _referenceCount.decrementAndGet();
+
+        // note that the operation of decrementing the reference count and 
then removing the message does not
+        // have to be atomic since the ref count starts at 1 and the exchange 
itself decrements that after
+        // the message has been passed to all queues. i.e. we are
+        // not relying on the all the increments having taken place before the 
delivery manager decrements.
+        if (count == 0)
+        {
+            // set the reference count way below 0 so that we can detect that 
the message has been deleted
+            // this is to guard against the message being spontaneously 
recreated (from the mgmt console)
+            // by copying from other queues at the same time as it is being 
removed.
+            _referenceCount.set(Integer.MIN_VALUE/2);
+
+            // must check if the handle is null since there may be cases where 
we decide to throw away a message
+            // and the handle has not yet been constructed
+            if (_handle != null)
+            {
+                _handle.remove();
+            }
+        }
+        else
+        {
+            if (count < 0)
+            {
+                throw new RuntimeException("Reference count for message id " + 
debugIdentity()
+                                                  + " has gone below 0.");
+            }
+        }
+    }
+
+    public String debugIdentity()
+    {
+        return "(HC:" + System.identityHashCode(this) + " ID:" + 
getMessageNumber() + " Ref:" + getReferenceCount() + ")";
+    }
+
+    protected int getReferenceCount()
+    {
+        return _referenceCount.get();
+    }
+}
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=1195929&r1=1195928&r2=1195929&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
 Tue Nov  1 11:18:06 2011
@@ -29,18 +29,14 @@ import java.nio.ByteBuffer;
 import java.lang.ref.WeakReference;
 
 
-public class MessageTransferMessage implements InboundMessage, ServerMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl 
implements InboundMessage
 {
-
-
     private StoredMessage<MessageMetaData_0_10> _storeMessage;
-
-
     private WeakReference<Session> _sessionRef;
 
     public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> 
storeMessage, WeakReference<Session> sessionRef)
     {
-
+        super(storeMessage);
         _storeMessage = storeMessage;
         _sessionRef = sessionRef;
     }
@@ -145,5 +141,4 @@ public class MessageTransferMessage impl
     {
         return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
     }
-    
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java?rev=1195929&r1=1195928&r2=1195929&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
 Tue Nov  1 11:18:06 2011
@@ -29,11 +29,11 @@ public class TransferMessageReference ex
 
     protected void onReference(MessageTransferMessage message)
     {
-
+        message.incrementReference();
     }
 
     protected void onRelease(MessageTransferMessage message)
     {
-
+        message.decrementReference();
     }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1195929&r1=1195928&r2=1195929&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 Tue Nov  1 11:18:06 2011
@@ -53,6 +53,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -172,6 +173,7 @@ public class ServerSession extends Sessi
 
                 public void postCommit()
                 {
+                    MessageReference<?> ref = message.newReference();
                     for(int i = 0; i < _queues.length; i++)
                     {
                         try
@@ -184,6 +186,7 @@ public class ServerSession extends Sessi
                             throw new RuntimeException(e);
                         }
                     }
+                    ref.release();
                 }
 
                 public void onRollback()

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java?rev=1195929&r1=1195928&r2=1195929&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/ReferenceCountingTest.java
 Tue Nov  1 11:18:06 2011
@@ -86,11 +86,7 @@ public class ReferenceCountingTest exten
 
         AMQMessage message = new AMQMessage(storedMessage);
 
-        message = message.takeReference();
-
-        // we call routing complete to set up the handle
- //       message.routingComplete(_store, _storeContext, new 
MessageHandleFactory());
-
+        message.incrementReference();
 
         assertEquals(1, _store.getMessageCount());
         message.decrementReference();
@@ -146,12 +142,12 @@ public class ReferenceCountingTest exten
         AMQMessage message = new AMQMessage(storedMessage);
 
 
-        message = message.takeReference();
+        message.incrementReference();
         // we call routing complete to set up the handle
      //   message.routingComplete(_store, _storeContext, new 
MessageHandleFactory());
 
         assertEquals(1, _store.getMessageCount());
-        message = message.takeReference();
+        message.incrementReference();
         message.decrementReference();
         assertEquals(1, _store.getMessageCount());
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to