Gordon Sim wrote:
There has been much discussion around the need for a higher-level c++
API that insulates an application from any protocol specific details
while allowing specific features of any given protocol to be exploited
through the abstractions and extension points that this API offers.

I should note that the direct use of the AMQP 0-10 specific API will
of course still be supported.

The API I have in mind is based on the concepts of message sources
(from which messages can be received) and message sinks (to which
messages can be sent).

To send messages on a particular session, the application obtains a
MessageProducer for that session from the chosen sink, through which
it can then send messages. To receive messages, it subscribes the
session to the chosen source. A subscription can be assoicated with a
MessageListener (for 'push' style interaction) or it can create a
MessageReceiver (for 'pull' style interactions).

Two common patterns for sources and sinks are offered though the Queue
and Topic classes which allow sources and sinks to be created for
named queues in a point-to-point pattern, or publish-subscribe
'topics'.

The attached patch offers a working sketch of such an API. The API is
currently in the qpid::client::api namespace, primarily to keep it
separate from the existing AMQP 0-10 specific API.

There is also an implementation of this high-level API in terms of the
existing 0-10 API in the qpid::client::amqp0_10 namespace, and a few
example programs to demonstrate the use of queues and topics
(examples/hla).

This is still very much work in progress intended to demonstrate the
source/sink concept and how that can be mapped to different patterns
of interaction on a given protocol implementation.

A critical aspect that I have yet to address is the nature of the
Message interface (currently this just exposes raw data with no
properties or headers). I think we also need to figure out the common,
protocol independent options that we may want to directly support
e.g. browsing v. consuming, prefetch windows, automatic acking
v. application controlled acking etc. The threading model for connection processing and session dispatching also still needs to be thought through in more detail.

All queries, comments, criticisms etc are very welcome.



Here is a simplified version of the proposal. It involves fewer classes but I think is just as flexible and extensible as the original.
/**
   This is a sketch of some highre-level patterns of use intended as a
   supplement to (not replacement of) existing API. These patterns
   hide wiring and subscription details of setting up common patterns of use.

   Main changes from Gordons proposal:
    - Fold MessageProducer into MessageSink
    - No generic MessageSource, subscribe directly to pattern classe objects.
      Listener + Subscription already provide a generic representation of the source.
    - Avoids double layer of factories.
    - Pattern classes are real objects not just static function namespaces.
    - Using LocalQueue rather than MessageReceiver here, we can argue about the name later.
    - Drop bool synchronous arg, sync/async send have different return types, async must return a Future.

   ALTERNATE IDEAS:
    Build LocalQueue into Subscription, allow a listener to be attached to a subscription
    after it is created (it will receive any queued messages)
*/

class Future {...}; // Need a future for async send.

class MessageSink {
    // NB: Send can modify the message, e.g. set its routing key or attach properties.
    void send(Message&);  
    Future sendAsync(Message&);
    // Should cancel automatically in dtor, do we need an explicit cancel?
    void cancel();
};

// Re-cast LocalQueue as a Listener so there's only 1 mechanism to deliver messages to app.
// A single subscribe(Listener) function supports both push and pull.
class LocalQueue : Listener { ... };

class Topic {
    Topic(Session, name);
    Subscription subscribe(Listener, pattern);
    Subscription subscribe(Listener, pattern1, pattern2);
    ...
    Subscription subscribe(Listener, vector<string> patterns);

    MessageSink getSink(pattern);
};

// Avoid name Queue since that has a different meaning in AMQP than the pattern described here.
// Note inherits MessageSink since there's only one sync for a SimpleQueue - built in.
class SimpleQueue : public MessageSink {
    SimpleQueue(Session, name);
    Subscription subscribe(Listener);

    // send/sendAsync/cancel from MessageSink    
};

class XQueryEvaluator : public MessageSink {
    XQueryEvaluator(Session, name, ... ?);
    Subscription subscribe(Listener, xquery);
    
    // send/sendAsync/cancel from MessageSink    
};
    
    
// ==== Example code sketches.


// SimpleQueue
int main() {
    Session s = ...;
    SimpleQueue queue(s, "foo");

    // Sending
    for (...) {
        message = ...;
        queue.send(message);
    }
    // Pull style:
    LocalQueue lq;
    queue.subscribe(lq);
    while (...) { Message m = lq.get(); ... }

    // Push style
    MyListener l;
    queue.subscribe(l);
    sessoin.run();
}

// SimpleQueue
int main() {
    Session s = ...;
    SimpleQueue queue(s, "foo");
    for (...) {
        message = ...;
        queue.send(message);
    }
    // Pull style:
    LocalQueue lq;
    queue.subscribe(lq);
    while (...) { Message m = lq.get(); ... }

    // Push style
    MyListener l;
    queue.subscribe(l);
    session.run();
}

// Topic
    
int main() {
    Session s;
    Topic topic(s, "myTopic");

    // Sending
    sendMessages(topic.getSink("usa.news"), count, "news about usa");
    sendMessages(topic.getSink("europe.news"), count, "news about europe");

    // Pull style
    LocalQueue lq;
    topic.subscribe(lq, pattern);
    for (...) { Message m = topic.get(); }

    // Push style
    MyListener l;
    queue.subscribe(l);
    session.run();
}

void sendMessages(MessageSink topic, int count, string news) {
    for (... count ...) { topic.send(Message(news); }
    }

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org

Reply via email to