Author: aconway
Date: Fri Jan 16 09:25:18 2009
New Revision: 735059

URL: http://svn.apache.org/viewvc?rev=735059&view=rev
Log:
Separate cluster::EventHeader to allow non-copy events.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=735059&r1=735058&r2=735059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jan 16 09:25:18 2009
@@ -187,7 +187,7 @@
     Mutex::ScopedLock l(lock);
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
-    Event e(Event::decode(from, buf));
+    Event e(Event::decodeCopy(from, buf));
     if (from == myId) // Record self-deliveries for flow control.
         mcast.selfDeliver(e);
     deliver(e, l);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=735059&r1=735058&r2=735059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Jan 16 09:25:18 2009
@@ -32,28 +32,37 @@
 
 using framing::Buffer;
 
-const size_t Event::HEADER_SIZE =
+const size_t EventHeader::HEADER_SIZE =
     sizeof(uint8_t) +  // type
     sizeof(uint64_t) + // connection pointer only, CPG provides member ID.
     sizeof(uint32_t);  // payload size
 
+EventHeader::EventHeader(EventType t, const ConnectionId& c,  size_t s)
+    : type(t), connectionId(c), size(s) {}
+
 Event::Event(EventType t, const ConnectionId& c,  size_t s)
-    : type(t), connectionId(c), size(s), 
store(RefCountedBuffer::create(s+HEADER_SIZE)) {
+    : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE))
+{
     encodeHeader();
 }
 
-Event Event::decode(const MemberId& m, framing::Buffer& buf) {
+void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
     if (buf.available() <= HEADER_SIZE)
         throw ClusterLeaveException("Not enough for multicast header");
-    EventType type((EventType)buf.getOctet());
+    type = (EventType)buf.getOctet();
     if(type != DATA && type != CONTROL)
         throw ClusterLeaveException("Invalid multicast event type");
-    ConnectionId connection(m, 
reinterpret_cast<Connection*>(buf.getLongLong()));
-    uint32_t size = buf.getLong();
-    Event e(type, connection, size);
-    if (buf.available() < size)
+    connectionId = ConnectionId(m, 
reinterpret_cast<Connection*>(buf.getLongLong()));
+    size = buf.getLong();
+}
+
+Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) {
+    EventHeader h;
+    h.decode(m, buf);           // Header
+    Event e(h.getType(), h.getConnectionId(), h.getSize());
+    if (buf.available() < e.size)
         throw ClusterLeaveException("Not enough data for multicast event");
-    memcpy(e.getData(), buf.getPointer() + buf.getPosition(), size);
+    memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
     return e;
 }
 
@@ -65,11 +74,16 @@
     return e;
 }
     
-void Event::encodeHeader () {
-    Buffer b(getStore(), HEADER_SIZE);
+void EventHeader::encode(Buffer& b) const {
     b.putOctet(type);
     b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
     b.putLong(size);
+}
+
+// Encode my header in my buffer.
+void Event::encodeHeader () {
+    Buffer b(getStore(), HEADER_SIZE);
+    encode(b);
     assert(b.getPosition() == HEADER_SIZE);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=735059&r1=735058&r2=735059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Jan 16 09:25:18 2009
@@ -36,26 +36,44 @@
 // byte-stream data.
 // 
 
+/** Header data for a multicast event */
+class EventHeader {
+  public:
+    EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t 
size=0);
+    void decode(const MemberId& m, framing::Buffer&);
+    void encode(framing::Buffer&) const;
+
+    EventType getType() const { return type; }
+    ConnectionId getConnectionId() const { return connectionId; }
+    MemberId getMemberId() const { return connectionId.getMember(); }
+    size_t getSize() const { return size; }
+
+    bool isCluster() const { return connectionId.getPointer() == 0; }
+    bool isConnection() const { return connectionId.getPointer() != 0; }
+
+  protected:
+    static const size_t HEADER_SIZE;
+    
+    EventType type;
+    ConnectionId connectionId;
+    size_t size;
+};
+
 /**
  * Events are sent to/received from the cluster.
  * Refcounted so they can be stored on queues.
  */
-class Event {
+class Event : public EventHeader {
   public:
     /** Create an event with a buffer that can hold size bytes plus an event 
header. */
     Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t 
size=0);
 
     /** Create an event copied from delivered data. */
-    static Event decode(const MemberId& m, framing::Buffer&);
+    static Event decodeCopy(const MemberId& m, framing::Buffer&);
 
     /** Create an event containing a control */
     static Event control(const framing::AMQBody&, const ConnectionId&);
     
-    EventType getType() const { return type; }
-    ConnectionId getConnectionId() const { return connectionId; }
-    MemberId getMemberId() const { return connectionId.getMember(); }
-    size_t getSize() const { return size; }
-
     // Data excluding header.
     char* getData() { return store + HEADER_SIZE; }
     const char* getData() const { return store + HEADER_SIZE; }
@@ -65,19 +83,11 @@
     const char* getStore() const { return store; }
     size_t getStoreSize() { return size + HEADER_SIZE; }
     
-    bool isCluster() const { return connectionId.getPointer() == 0; }
-    bool isConnection() const { return connectionId.getPointer() != 0; }
-
     operator framing::Buffer() const;
 
   private:
-    static const size_t HEADER_SIZE;
-    
     void encodeHeader();
 
-    EventType type;
-    ConnectionId connectionId;
-    size_t size;
     RefCountedBuffer::pointer store;
 };
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=735059&r1=735058&r2=735059&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Jan 16 09:25:18 2009
@@ -234,17 +234,6 @@
     BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
 }
 
-QPID_AUTO_TEST_CASE(testUnsupported) {
-    ScopedSuppressLogging sl;
-    ClusterFixture cluster(1);
-    Client c1(cluster[0], "c1");
-    BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException);    
-    Client c2(cluster[0], "c2");
-    Message  m;
-    m.getDeliveryProperties().setTtl(1);
-    BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);  
  
-}
-
 QPID_AUTO_TEST_CASE(testTxTransaction) {
     ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");


Reply via email to