Author: aconway
Date: Fri Sep 30 20:56:15 2011
New Revision: 1177833

URL: http://svn.apache.org/viewvc?rev=1177833&view=rev
Log:
QPID-2920: Send messages frame by frame.

The sender picks a channel number unique within that sender. Messages
are sent over CPG frame-by-frame and assembled based on the sender and
channel number. Channel numbers can be re-used once the send is complete.

Added:
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp
   (with props)
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h  
 (with props)
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h   
(with props)
Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.h
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
    
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk Fri Sep 30 20:56:15 
2011
@@ -126,6 +126,8 @@ cluster2_la_SOURCES =                               \
        qpid/cluster/exp/hash.h                 \
        qpid/cluster/exp/HandlerBase.cpp        \
        qpid/cluster/exp/HandlerBase.h          \
+       qpid/cluster/exp/MessageBuilders.cpp    \
+       qpid/cluster/exp/MessageBuilders.h      \
        qpid/cluster/exp/MessageHandler.cpp     \
        qpid/cluster/exp/MessageHandler.h       \
        qpid/cluster/exp/Multicaster.cpp        \
@@ -138,6 +140,7 @@ cluster2_la_SOURCES =                               \
        qpid/cluster/exp/QueueReplica.h         \
        qpid/cluster/exp/Settings.cpp           \
        qpid/cluster/exp/Settings.h             \
+       qpid/cluster/exp/UniqueIds.h            \
        qpid/cluster/exp/WiringHandler.cpp      \
        qpid/cluster/exp/WiringHandler.h
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.h?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.h Fri Sep 30 
20:56:15 2011
@@ -113,6 +113,8 @@ class Cpg : public sys::IOHandle {
     MemberId self() const;
 
     int getFd();
+
+    std::string getName() const { return str(group); }
     
   private:
 

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp 
Fri Sep 30 20:56:15 2011
@@ -41,6 +41,7 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/log/Statement.h"
+#include <boost/bind.hpp>
 
 namespace qpid {
 namespace cluster {
@@ -84,15 +85,26 @@ BrokerContext::BrokerContext(Core& c) : 
 
 BrokerContext::~BrokerContext() {}
 
+
+namespace {
+void sendFrame(Multicaster& mcaster, const AMQFrame& frame, uint16_t channel) {
+    AMQFrame copy(frame);
+    copy.setChannel(channel);
+    mcaster.mcast(copy);
+}
+}
+
 bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& 
msg)
 {
     if (!tssReplicate) return true;
-    // FIXME aconway 2010-10-20: replicate message in fragments
-    // (frames), using fixed size bufffers.
-    std::string data(msg->encodedSize(),char());
-    framing::Buffer buf(&data[0], data.size());
-    msg->encode(buf);
-    mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), data));
+    // FIXME aconway 2011-09-29: for async completion the
+    // UniqueIds::release must move to self-delivery so we can
+    // identify the same message.
+    UniqueIds<uint16_t>::Scope s(channels);
+    uint16_t channel = s.id;
+    mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), 
channel));
+    std::for_each(msg->getFrames().begin(), msg->getFrames().end(),
+                  boost::bind(&sendFrame, boost::ref(mcaster(queue)), _1, 
channel));
     return false; // Strict order, wait for CPG self-delivery to enqueue.
 }
 

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h 
Fri Sep 30 20:56:15 2011
@@ -22,6 +22,7 @@
  *
  */
 
+#include "UniqueIds.h"
 #include "qpid/broker/Cluster.h"
 #include "qpid/sys/AtomicValue.h"
 
@@ -85,6 +86,7 @@ class BrokerContext : public broker::Clu
     Multicaster& mcaster(const std::string&);
 
     Core& core;
+    UniqueIds<uint16_t> channels;
 };
 }} // namespace qpid::cluster
 

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp 
Fri Sep 30 20:56:15 2011
@@ -62,26 +62,26 @@ void EventHandler::deliver(
     sender = MemberId(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     framing::AMQFrame frame;
+    // FIXME aconway 2011-09-29: don't decode own frame bodies. Ignore based 
on channel.
     while (buf.available()) {
         frame.decode(buf);
-        assert(frame.getBody());
-        QPID_LOG(trace, "cluster deliver: " << PrettyId(sender, self) << " "
-                 << *frame.getBody());
+        QPID_LOG(trace, "cluster deliver on " << cpg.getName() << " from "<< 
PrettyId(sender, self) << ": " << frame);
         try {
-            invoke(*frame.getBody());
+            handle(frame);
         } catch (const std::exception& e) {
             // Note: exceptions are assumed to be survivable,
             // fatal errors should log a message and call Core::fatal.
             QPID_LOG(error, e.what());
+            // FIXME aconway 2011-09-29: error handling
         }
     }
 }
 
-void EventHandler::invoke(const framing::AMQBody& body) {
+void EventHandler::handle(const framing::AMQFrame& frame) {
     for (Handlers::iterator i = handlers.begin(); i != handlers.end(); ++i)
-        if ((*i)->invoke(body)) return;
-    QPID_LOG(error, "Cluster received unknown control: " << body );
-    assert(0);                  // Error handling
+        if ((*i)->handle(frame)) return;
+    QPID_LOG(error, "Cluster received unknown frame: " << frame );
+    assert(0);             // FIXME aconway 2011-09-29: Error handling
 }
 
 struct PrintAddrs {

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h 
Fri Sep 30 20:56:15 2011
@@ -49,7 +49,6 @@ class EventHandler : public Cpg::Handler
   public:
     EventHandler(boost::shared_ptr<sys::Poller> poller,
                  boost::function<void()> onError);
-    
     ~EventHandler();
 
     /** Add a handler */
@@ -79,7 +78,7 @@ class EventHandler : public Cpg::Handler
     Cpg& getCpg() { return cpg; }
 
   private:
-    void invoke(const framing::AMQBody& body);
+    void handle(const framing::AMQFrame&);
 
     Cpg cpg;
     PollerDispatch dispatcher;

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h 
Fri Sep 30 20:56:15 2011
@@ -28,6 +28,7 @@ namespace qpid {
 
 namespace framing {
 class AMQBody;
+class AMQFrame;
 }
 
 namespace cluster {
@@ -42,7 +43,7 @@ class HandlerBase : public RefCounted
     HandlerBase(EventHandler&);
     virtual ~HandlerBase();
 
-    virtual bool invoke(const framing::AMQBody& body) = 0;
+    virtual bool handle(const framing::AMQFrame&) = 0;
     virtual void left(const MemberId&) {}
     virtual void joined(const MemberId&) {}
 

Added: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp?rev=1177833&view=auto
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp
 (added)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp
 Fri Sep 30 20:56:15 2011
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageBuilders.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/broker/Message.h"
+
+namespace qpid {
+namespace cluster {
+
+MessageBuilders::MessageBuilders(broker::MessageStore* s) : store(s) {}
+
+void MessageBuilders::announce(MemberId sender, uint16_t channel,
+                               const boost::shared_ptr<broker::Queue>& queue)
+{
+    ChannelId key(sender, channel);
+    if (map.find(key) != map.end())
+        throw Exception(
+            QPID_MSG("MessageBuilder channel " << channel << " on " << sender
+                     << " is already assigned."));
+    map[key] = std::make_pair(queue, new broker::MessageBuilder(store));
+}
+
+bool MessageBuilders::handle(
+    MemberId sender,
+    const framing::AMQFrame& frame,
+    boost::shared_ptr<broker::Queue>& queueOut,
+    boost::intrusive_ptr<broker::Message>& messageOut)
+{
+    ChannelId key(sender, frame.getChannel());
+    Map::iterator i = map.find(key);
+    if (i == map.end())
+        throw Exception(QPID_MSG("MessageBuilder channel " << 
frame.getChannel()
+                                 << " on " << sender << " is not assigned."));
+    boost::shared_ptr<broker::MessageBuilder> msgBuilder = i->second.second;
+    // Nasty bit of code pasted from broker::SessionState::handleContent.
+    // Should really be part of broker::MessageBuilder
+    if (frame.getBof() && frame.getBos()) //start of frameset
+        msgBuilder->start(0);
+    msgBuilder->handle(const_cast<framing::AMQFrame&>(frame));
+    if (frame.getEof() && frame.getEos()) { //end of frameset
+        if (frame.getBof()) {
+            //i.e this is a just a command frame, add a dummy header
+            framing::AMQFrame header((framing::AMQHeaderBody()));
+            header.setBof(false);
+            header.setEof(false);
+            msgBuilder->getMessage()->getFrames().append(header);
+        }
+        queueOut = i->second.first;
+        messageOut = msgBuilder->getMessage();
+        map.erase(key);
+        return true;
+    }
+    return false;
+}
+
+}} // namespace qpid::cluster

Propchange: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h?rev=1177833&view=auto
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h 
(added)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h 
Fri Sep 30 20:56:15 2011
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_EXP_MESSAGEBUILDERS_H
+#define QPID_CLUSTER_EXP_MESSAGEBUILDERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/cluster/types.h"
+#include "qpid/broker/MessageBuilder.h"
+#include <map>
+
+namespace qpid {
+namespace broker {
+class Queue;
+}
+namespace framing {
+class AMQFrame;
+}
+
+namespace cluster {
+
+/**
+ * Build messages received by CPG delivery.
+ */
+class MessageBuilders
+{
+  public:
+    MessageBuilders(broker::MessageStore* store);
+
+    /** Announce a message for queue arriving on channel from sender. */
+    void announce(MemberId sender, uint16_t channel,
+                  const boost::shared_ptr<broker::Queue>&);
+
+    /** Add a frame to the message in progress.
+     *@param sender member that sent the frame.
+     *@param frame is the frame to add.
+     *@param queueOut set to the queue if message complete.
+     *@param messageOut set to message if message complete.
+     *@return True if the frame completes a message
+     */
+    bool handle(MemberId sender, const framing::AMQFrame& frame,
+                boost::shared_ptr<broker::Queue>& queueOut,
+                boost::intrusive_ptr<broker::Message>& messageOut);
+
+  private:
+    typedef std::pair<MemberId, uint16_t> ChannelId;
+    typedef std::pair<boost::shared_ptr<broker::Queue>,
+                      boost::shared_ptr<broker::MessageBuilder> > QueueBuilder;
+    typedef std::map<ChannelId, QueueBuilder> Map;
+    Map map;
+    broker::MessageStore* store;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXP_MESSAGEBUILDERS_H*/

Propchange: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageBuilders.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp 
Fri Sep 30 20:56:15 2011
@@ -31,22 +31,43 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/Buffer.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace cluster {
+
 using namespace broker;
+using namespace framing;
 
 MessageHandler::MessageHandler(EventHandler& e, Core& c) :
     HandlerBase(e),
     broker(c.getBroker()),
-    core(c)
+    core(c),
+    messageBuilders(&c.getBroker().getStore())
 {}
 
-bool MessageHandler::invoke(const framing::AMQBody& body) {
-    return framing::invoke(*this, body).wasHandled();
+bool MessageHandler::handle(const framing::AMQFrame& frame) {
+    assert(frame.getBody());
+    const AMQBody& body = *frame.getBody();
+    if (framing::invoke(*this, body).wasHandled()) return true;
+    // Test for message frame
+    if (body.type() == HEADER_BODY || body.type() == CONTENT_BODY ||
+        (body.getMethod() && body.getMethod()->isA<MessageTransferBody>()))
+    {
+        boost::shared_ptr<broker::Queue> queue;
+        boost::intrusive_ptr<broker::Message> message;
+        if (messageBuilders.handle(sender(), frame, queue, message)) {
+            BrokerContext::ScopedSuppressReplication ssr;
+            queue->deliver(message);
+        }
+        // FIXME aconway 2011-09-29: async completion goes here.
+        // For own messages need to release the channel assigned by 
BrokerContext.
+        return true;
+    }
+    return false;
 }
 
 boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
@@ -57,17 +78,10 @@ boost::shared_ptr<broker::Queue> Message
     return queue;
 }
 
-void MessageHandler::enqueue(const std::string& q, const std::string& message) 
{
-
+void MessageHandler::enqueue(const std::string& q, uint16_t channel) {
     boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
-    // FIXME aconway 2010-10-28: decode message by frame in bounded-size 
buffers.
     // FIXME aconway 2011-09-28: don't re-decode my own messages
-    boost::intrusive_ptr<broker::Message> msg = new broker::Message();
-    framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
-    msg->decodeHeader(buf);
-    msg->decodeContent(buf);
-    BrokerContext::ScopedSuppressReplication ssr;
-    queue->deliver(msg);
+    messageBuilders.announce(sender(), channel, queue);
 }
 
 // FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h 
Fri Sep 30 20:56:15 2011
@@ -25,6 +25,7 @@
 // TODO aconway 2010-10-19: experimental cluster code.
 
 #include "HandlerBase.h"
+#include "MessageBuilders.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include <boost/intrusive_ptr.hpp>
 #include <map>
@@ -54,9 +55,9 @@ class MessageHandler : public framing::A
   public:
     MessageHandler(EventHandler&, Core&);
 
-    bool invoke(const framing::AMQBody& body);
+    bool handle(const framing::AMQFrame&);
 
-    void enqueue(const std::string& queue, const std::string& message);
+    void enqueue(const std::string& queue, uint16_t channel);
     void acquire(const std::string& queue, uint32_t position);
     void dequeue(const std::string& queue, uint32_t position);
     void requeue(const std::string& queue, uint32_t position, bool 
redelivered);
@@ -66,6 +67,7 @@ class MessageHandler : public framing::A
 
     broker::Broker& broker;
     Core& core;
+    MessageBuilders messageBuilders;
 };
 }} // namespace qpid::cluster
 

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp 
Fri Sep 30 20:56:15 2011
@@ -52,7 +52,7 @@ Multicaster::Multicaster(Cpg& cpg_,
 }
 
 void Multicaster::mcast(const framing::AMQFrame& data) {
-    QPID_LOG(trace, "cluster multicast: " << data);
+    QPID_LOG(trace, "cluster multicast on " << cpg.getName() << ": " << data);
     BufferRef bufRef = buffers.get(data.encodedSize());
     framing::Buffer buf(bufRef.begin(), bufRef.size());
     data.encode(buf);

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp 
Fri Sep 30 20:56:15 2011
@@ -108,7 +108,7 @@ void QueueContext::timeout() {
     queue.stopConsumers();
 }
 
-// Callback set up by queue.stopConsumers() called in connection thread.
+// Callback set up by queue.stopConsumers() called in connection or timer 
thread.
 // Called when no threads are dispatching from the queue.
 void QueueContext::stopped() {
     sys::Mutex::ScopedLock l(lock);

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp 
Fri Sep 30 20:56:15 2011
@@ -36,8 +36,8 @@ namespace cluster {
 QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s)
     : HandlerBase(eh), multicaster(m),  consumeLock(s.getConsumeLock()) {}
 
-bool QueueHandler::invoke(const framing::AMQBody& body) {
-    return framing::invoke(*this, body).wasHandled();
+bool QueueHandler::handle(const framing::AMQFrame& frame) {
+    return framing::invoke(*this, *frame.getBody()).wasHandled();
 }
 
 void QueueHandler::subscribe(const std::string& queue) {

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h 
(original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h 
Fri Sep 30 20:56:15 2011
@@ -56,7 +56,7 @@ class QueueHandler : public framing::AMQ
   public:
     QueueHandler(EventHandler&, Multicaster&, const Settings&);
 
-    bool invoke(const framing::AMQBody& body);
+    bool handle(const framing::AMQFrame& body);
 
     // Events
     void subscribe(const std::string& queue);

Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h?rev=1177833&view=auto
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h 
(added)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h 
Fri Sep 30 20:56:15 2011
@@ -0,0 +1,70 @@
+#ifndef QPID_CLUSTER_EXP_UNIQUEIDS_H
+#define QPID_CLUSTER_EXP_UNIQUEIDS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Assign ID numbers, ensuring that all the assigned IDs are unique
+ * T is the numeric type - actually any type with >, == and ++ will do.
+ */
+template <class T> class UniqueIds
+{
+  public:
+    /** Get an ID that is different from all other active IDs.
+     *@return the ID, which is now considered active.
+     */
+    T get() {
+        sys::Mutex::ScopedLock l(lock);
+        T old = mark;
+        while (active.find(++mark) != active.end() && mark != old)
+            ;
+        assert(mark != old);      // check wrap-around
+        active.insert(mark);
+        return mark;
+    }
+    /** Release an ID, so it is inactive and available for re-use */
+    void release(T id) {
+        sys::Mutex::ScopedLock l(lock);
+        active.erase(id);
+    }
+    /** Allocate an ID, release automatically at end of scope */
+    struct Scope {
+        UniqueIds& ids;
+        T id;
+        Scope(UniqueIds& ids_) : ids(ids_), id(ids.get()) {}
+        ~Scope() { ids.release(id); }
+    };
+
+  private:
+    sys::Mutex lock;
+    std::set<T> active;
+    T mark;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXP_UNIQUEIDS_H*/

Propchange: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/UniqueIds.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp 
Fri Sep 30 20:56:15 2011
@@ -50,8 +50,8 @@ WiringHandler::WiringHandler(EventHandle
     queueHandler(qh)
 {}
 
-bool WiringHandler::invoke(const framing::AMQBody& body) {
-    return framing::invoke(*this, body).wasHandled();
+bool WiringHandler::handle(const framing::AMQFrame& frame) {
+    return framing::invoke(*this, *frame.getBody()).wasHandled();
 }
 
 void WiringHandler::createQueue(const std::string& data) {

Modified: 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h 
(original)
+++ 
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h 
Fri Sep 30 20:56:15 2011
@@ -53,7 +53,7 @@ class WiringHandler : public framing::AM
   public:
     WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, 
broker::Broker&);
 
-    bool invoke(const framing::AMQBody& body);
+    bool handle(const framing::AMQFrame&);
 
     void createQueue(const std::string& data);
     void destroyQueue(const std::string& name);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml?rev=1177833&r1=1177832&r2=1177833&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml Fri Sep 30 20:56:15 
2011
@@ -333,7 +333,7 @@
   <class name="cluster-message" code="0x82">
     <control name="enqueue" code="0x2">
       <field name="queue" type="queue.name"/>
-      <field name="message" type="str32"/>
+      <field name="channel" type="uint16"/>
     </control>
 
     <control name="acquire" code="0x4">



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to