Author: ritchiem
Date: Fri Apr 17 13:17:19 2009
New Revision: 765984

URL: http://svn.apache.org/viewvc?rev=765984&view=rev
Log:
QPID-1807 : Copied the broker from trunk and update SlowMessageStore to use 
MessageStores rather than TransactionLogs.

Merge from trunk inlucding recent fixes 

This mirrors the change done in r764850 in bring the broker from 0.5-fix to 
trunk


Added:
    qpid/branches/0.5-release/qpid/java/broker/
      - copied from r765967, qpid/trunk/qpid/java/broker/
Modified:
    
qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java

Modified: 
qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=765984&r1=765983&r2=765984&view=diff
==============================================================================
--- 
qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
 (original)
+++ 
qpid/branches/0.5-release/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
 Fri Apr 17 13:17:19 2009
@@ -31,62 +31,54 @@
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.routing.RoutingTable;
 
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.ArrayList;
 
-public class SlowMessageStore implements TransactionLog, RoutingTable
+public class SlowMessageStore implements MessageStore
 {
     private static final Logger _logger = 
Logger.getLogger(SlowMessageStore.class);
     private static final String DELAYS = "delays";
     private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
     private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
     private long _defaultDelay = 0L;
-    private TransactionLog _realTransactionLog = new MemoryMessageStore();
-    private RoutingTable _realRoutingTable = (RoutingTable)_realTransactionLog;
+    private MessageStore _realStore = new MemoryMessageStore();
     private static final String PRE = "pre";
     private static final String POST = "post";
     private String DEFAULT_DELAY = "default";
 
-    public Object configure(VirtualHost virtualHost, String base, 
VirtualHostConfiguration config) throws Exception
+    public void configure(VirtualHost virtualHost, String base, 
VirtualHostConfiguration config) throws Exception
     {
-        _logger.warn("Starting SlowMessageStore on Virtualhost:" + 
virtualHost.getName());
+        _logger.info("Starting SlowMessageStore on Virtualhost:" + 
virtualHost.getName());
         Configuration delays = config.getStoreConfiguration().subset(DELAYS);
 
         configureDelays(delays);
 
-        String transactionLogClass = config.getTransactionLogClass();
+        String messageStoreClass = 
config.getStoreConfiguration().getString("realStore");
 
         if (delays.containsKey(DEFAULT_DELAY))
         {
             _defaultDelay = delays.getLong(DEFAULT_DELAY);
-            _logger.warn("Delay is:" + _defaultDelay);
         }
 
-        if (transactionLogClass != null)
+        if (messageStoreClass != null)
         {
-            Class clazz = Class.forName(transactionLogClass);
-            if (clazz != this.getClass())
-            {
+            Class clazz = Class.forName(messageStoreClass);
 
-                Object o = clazz.newInstance();
+            Object o = clazz.newInstance();
 
-                if (!(o instanceof TransactionLog))
-                {
-                    throw new ClassCastException("TransactionLog class must 
implement " + TransactionLog.class + ". Class " + clazz +
-                    " does not.");
-                }
-                _realTransactionLog = (TransactionLog) o;
+            if (!(o instanceof MessageStore))
+            {
+                throw new ClassCastException("Message store class must 
implement " + MessageStore.class + ". Class " + clazz +
+                                             " does not.");
             }
+            _realStore = (MessageStore) o;
+            _realStore.configure(virtualHost, base + ".store", config);
+        }
+        else
+        {
+            _realStore.configure(virtualHost, base + ".store", config);
         }
-
-        // The call to configure may return a new transaction log 
-        _realTransactionLog = (TransactionLog) 
_realTransactionLog.configure(virtualHost, base , config);
-        
-        return this;
     }
 
     private void configureDelays(Configuration config)
@@ -159,35 +151,42 @@
     public void close() throws Exception
     {
         doPreDelay("close");
-        _realTransactionLog.close();
+        _realStore.close();
         doPostDelay("close");
     }
 
+    public void removeMessage(StoreContext storeContext, Long messageId) 
throws AMQException
+    {
+        doPreDelay("removeMessage");
+        _realStore.removeMessage(storeContext, messageId);
+        doPostDelay("removeMessage");
+    }
+
     public void createExchange(Exchange exchange) throws AMQException
     {
         doPreDelay("createExchange");
-        _realRoutingTable.createExchange(exchange);
+        _realStore.createExchange(exchange);
         doPostDelay("createExchange");
     }
 
     public void removeExchange(Exchange exchange) throws AMQException
     {
         doPreDelay("removeExchange");
-        _realRoutingTable.removeExchange(exchange);
+        _realStore.removeExchange(exchange);
         doPostDelay("removeExchange");
     }
 
     public void bindQueue(Exchange exchange, AMQShortString routingKey, 
AMQQueue queue, FieldTable args) throws AMQException
     {
         doPreDelay("bindQueue");
-        _realRoutingTable.bindQueue(exchange, routingKey, queue, args);
+        _realStore.bindQueue(exchange, routingKey, queue, args);
         doPostDelay("bindQueue");
     }
 
     public void unbindQueue(Exchange exchange, AMQShortString routingKey, 
AMQQueue queue, FieldTable args) throws AMQException
     {
         doPreDelay("unbindQueue");
-        _realRoutingTable.unbindQueue(exchange, routingKey, queue, args);
+        _realStore.unbindQueue(exchange, routingKey, queue, args);
         doPostDelay("unbindQueue");
     }
 
@@ -199,100 +198,101 @@
     public void createQueue(AMQQueue queue, FieldTable arguments) throws 
AMQException
     {
         doPreDelay("createQueue");
-        _realRoutingTable.createQueue(queue, arguments);
+        _realStore.createQueue(queue, arguments);
         doPostDelay("createQueue");
     }
 
     public void removeQueue(AMQQueue queue) throws AMQException
     {
         doPreDelay("removeQueue");
-        _realRoutingTable.removeQueue(queue);
+        _realStore.removeQueue(queue);
         doPostDelay("removeQueue");
     }
 
-    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> 
queues, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, AMQQueue queue, Long 
messageId) throws AMQException
     {
         doPreDelay("enqueueMessage");
-        _realTransactionLog.enqueueMessage(context, queues, messageId);
+        _realStore.enqueueMessage(context, queue, messageId);
         doPostDelay("enqueueMessage");
     }
 
     public void dequeueMessage(StoreContext context, AMQQueue queue, Long 
messageId) throws AMQException
     {
         doPreDelay("dequeueMessage");
-        _realTransactionLog.dequeueMessage(context, queue, messageId);
-        doPostDelay("dequeueMessage");
-    }
-
-    public void removeMessage(StoreContext context, Long messageId) throws 
AMQException
-    {
-        doPreDelay("dequeueMessage");
-        _realTransactionLog.removeMessage(context, messageId);
+        _realStore.dequeueMessage(context, queue, messageId);
         doPostDelay("dequeueMessage");
     }
 
     public void beginTran(StoreContext context) throws AMQException
     {
         doPreDelay("beginTran");
-        _realTransactionLog.beginTran(context);
+        _realStore.beginTran(context);
         doPostDelay("beginTran");
     }
 
     public void commitTran(StoreContext context) throws AMQException
     {
         doPreDelay("commitTran");
-        _realTransactionLog.commitTran(context);
+        _realStore.commitTran(context);
         doPostDelay("commitTran");
     }
 
     public void abortTran(StoreContext context) throws AMQException
     {
         doPreDelay("abortTran");
-        _realTransactionLog.abortTran(context);
+        _realStore.abortTran(context);
         doPostDelay("abortTran");
     }
 
     public boolean inTran(StoreContext context)
     {
         doPreDelay("inTran");
-        boolean b = _realTransactionLog.inTran(context);
+        boolean b = _realStore.inTran(context);
         doPostDelay("inTran");
         return b;
     }
 
+    public Long getNewMessageId()
+    {
+        doPreDelay("getNewMessageId");
+        Long l = _realStore.getNewMessageId();
+        doPostDelay("getNewMessageId");
+        return l;
+    }
+
     public void storeContentBodyChunk(StoreContext context, Long messageId, 
int index, ContentChunk contentBody, boolean lastContentBody) throws 
AMQException
     {
         doPreDelay("storeContentBodyChunk");
-        _realTransactionLog.storeContentBodyChunk(context, messageId, index, 
contentBody, lastContentBody);
+        _realStore.storeContentBodyChunk(context, messageId, index, 
contentBody, lastContentBody);
         doPostDelay("storeContentBodyChunk");
     }
 
     public void storeMessageMetaData(StoreContext context, Long messageId, 
MessageMetaData messageMetaData) throws AMQException
     {
         doPreDelay("storeMessageMetaData");
-        _realTransactionLog.storeMessageMetaData(context, messageId, 
messageMetaData);
+        _realStore.storeMessageMetaData(context, messageId, messageMetaData);
         doPostDelay("storeMessageMetaData");
     }
 
-//    public MessageMetaData getMessageMetaData(StoreContext context, Long 
messageId) throws AMQException
-//    {
-//        doPreDelay("getMessageMetaData");
-//        MessageMetaData mmd = 
_realTransactionLog.getMessageMetaData(context, messageId);
-//        doPostDelay("getMessageMetaData");
-//        return mmd;
-//    }
-//
-//    public ContentChunk getContentBodyChunk(StoreContext context, Long 
messageId, int index) throws AMQException
-//    {
-//        doPreDelay("getContentBodyChunk");
-//        ContentChunk c = _realTransactionLog.getContentBodyChunk(context, 
messageId, index);
-//        doPostDelay("getContentBodyChunk");
-//        return c;
-//    }
-//
+    public MessageMetaData getMessageMetaData(StoreContext context, Long 
messageId) throws AMQException
+    {
+        doPreDelay("getMessageMetaData");
+        MessageMetaData mmd = _realStore.getMessageMetaData(context, 
messageId);
+        doPostDelay("getMessageMetaData");
+        return mmd;
+    }
+
+    public ContentChunk getContentBodyChunk(StoreContext context, Long 
messageId, int index) throws AMQException
+    {
+        doPreDelay("getContentBodyChunk");
+        ContentChunk c = _realStore.getContentBodyChunk(context, messageId, 
index);
+        doPostDelay("getContentBodyChunk");
+        return c;
+    }
+
     public boolean isPersistent()
     {
-        return _realTransactionLog.isPersistent();
+        return _realStore.isPersistent();
     }
 
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to