Author: tross Date: Tue Dec 6 22:41:40 2011 New Revision: 1211214 URL: http://svn.apache.org/viewvc?rev=1211214&view=rev Log: QPID-3663 - Updated the LVQ section of the C++ Broker book.
Modified: qpid/trunk/qpid/doc/book/src/LVQ.xml Modified: qpid/trunk/qpid/doc/book/src/LVQ.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/LVQ.xml?rev=1211214&r1=1211213&r2=1211214&view=diff ============================================================================== --- qpid/trunk/qpid/doc/book/src/LVQ.xml (original) +++ qpid/trunk/qpid/doc/book/src/LVQ.xml Tue Dec 6 22:41:40 2011 @@ -20,343 +20,163 @@ --> -<section><title> - LVQ - </title> - - <section role="h2" id="LVQ-UnderstandingLVQ"><title> - Understanding LVQ - </title> - <para> - Last Value Queues are useful youUser Documentation are only - interested in the latest value entered into a queue. LVQ - semantics are typically used for things like stock symbol updates - when all you care about is the latest value for example. - </para><para> - Qpid C++ M4 or later supports two types of LVQ semantics: - </para><itemizedlist> - <listitem><para>LVQ - </para></listitem> - <listitem><para>LVQ_NO_BROWSE - </para></listitem> - </itemizedlist> - <!--h2--></section> - - - <section role="h2" id="LVQ-LVQsemantics-3A"><title> - LVQ semantics: - </title> +<section> + <title>LVQ - Last Value Queue</title> + + <section role="h2" id="LVQ-UnderstandingLVQ"> + <title>Understanding LVQ</title> + <para> + A Last Value Queue is configured with the name of a message header that + is used as a key. The queue behaves as a normal FIFO queue with the + exception that when a message is enqueued, any other message in the + queue with the same value in the key header is removed and discarded. + Thus, for any given key value, the queue holds only the most recent + message. + </para> + <para> + The following example illustrates the operation of a Last Value Queue. + The example shows an empty queue with no consumers and a sequence of + produced messages. The numbers represent the key for each message. + </para> + <programlisting> + <empty queue> + 1 => + 1 + 2 => + 1 2 + 3 => + 1 2 3 + 4 => + 1 2 3 4 + 2 => + 1 3 4 2 + 1 => + 3 4 2 1 + </programlisting> + <para> + Note that the first four messages are enqueued normally in FIFO order. + The fifth message has key '2' and is also enqueued on the tail of the + queue. However the message already in the queue with the same key is + discarded. + <note> + <para> + If the set of keys used in the messages in a LVQ is constrained, the + number of messages in the queue shall not exceed the number of + distinct keys in use. + </para> + </note> + </para> + <section role="h3" id="LVQ-UnderstandingLVQ-UseCases"> + <title>Common Use-Cases</title> + <itemizedlist> + <listitem> <para> - LVQ uses a header for a key, if the key matches it replaces the - message in-place in the queue except - a.) if the message with the matching key has been acquired - b.) if the message with the matching key has been browsed - In these two cases the message is placed into the queue in FIFO, - if another message with the same key is received it will the - 'un-accessed' message with the same key will be replaced - </para><para> - These two exceptions protect the consumer from missing the last - update where a consumer or browser accesses a message and an - update comes with the same key. - </para><para> - An example + LVQ with zero or one consuming subscriptions - In this case, if + the consumer drops momentarily or is slower than the producer(s), + it will only receive current information relative to the message + keys. </para> - <programlisting> -[localhost tests]$ ./lvqtest --mode create_lvq -[localhost tests]$ ./lvqtest --mode write -Sending Data: key1=key1.0x7fffdf3f3180 -Sending Data: key2=key2.0x7fffdf3f3180 -Sending Data: key3=key3.0x7fffdf3f3180 -Sending Data: key1=key1.0x7fffdf3f3180 -Sending Data: last=last -[localhost tests]$ ./lvqtest --mode browse -Receiving Data:key1.0x7fffdf3f3180 -Receiving Data:key2.0x7fffdf3f3180 -Receiving Data:key3.0x7fffdf3f3180 -Receiving Data:last -[localhost tests]$ ./lvqtest --mode write -Sending Data: key1=key1.0x7fffe4c7fa10 -Sending Data: key2=key2.0x7fffe4c7fa10 -Sending Data: key3=key3.0x7fffe4c7fa10 -Sending Data: key1=key1.0x7fffe4c7fa10 -Sending Data: last=last -[localhost tests]$ ./lvqtest --mode browse -Receiving Data:key1.0x7fffe4c7fa10 -Receiving Data:key2.0x7fffe4c7fa10 -Receiving Data:key3.0x7fffe4c7fa10 -Receiving Data:last -[localhost tests]$ ./lvqtest --mode consume -Receiving Data:key1.0x7fffdf3f3180 -Receiving Data:key2.0x7fffdf3f3180 -Receiving Data:key3.0x7fffdf3f3180 -Receiving Data:last -Receiving Data:key1.0x7fffe4c7fa10 -Receiving Data:key2.0x7fffe4c7fa10 -Receiving Data:key3.0x7fffe4c7fa10 -Receiving Data:last -</programlisting> -<!--h2--></section> - <section role="h2" id="LVQ-LVQNOBROWSEsemantics-3A"><title> - LVQ_NO_BROWSE - semantics: - </title> + </listitem> + <listitem> <para> - LVQ uses a header for a key, if the key matches it replaces the - message in-place in the queue except - a.) if the message with the matching key has been acquired - In these two cases the message is placed into the queue in FIFO, - if another message with the same key is received it will the - 'un-accessed' message with the same key will be replaced - </para><para> - Note, in this case browsed messaged are not invalidated, so - updates can be missed. - </para><para> - An example + LVQ with zero or more browsing subscriptions - A browsing consumer + can subscribe to the LVQ and get an immediate dump of all of the + "current" messages and track updates thereafter. Any number of + independent browsers can subscribe to the same LVQ with the same + effect. Since messages are never consumed, they only disappear + when replaced with a newer message with the same key or when their + TTL expires. </para> - <programlisting> -[localhost tests]$ ./lvqtest --mode create_lvq_no_browse -[localhost tests]$ ./lvqtest --mode write -Sending Data: key1=key1.0x7fffce5fb390 -Sending Data: key2=key2.0x7fffce5fb390 -Sending Data: key3=key3.0x7fffce5fb390 -Sending Data: key1=key1.0x7fffce5fb390 -Sending Data: last=last -[localhost tests]$ ./lvqtest --mode write -Sending Data: key1=key1.0x7fff346ae440 -Sending Data: key2=key2.0x7fff346ae440 -Sending Data: key3=key3.0x7fff346ae440 -Sending Data: key1=key1.0x7fff346ae440 -Sending Data: last=last -[localhost tests]$ ./lvqtest --mode browse -Receiving Data:key1.0x7fff346ae440 -Receiving Data:key2.0x7fff346ae440 -Receiving Data:key3.0x7fff346ae440 -Receiving Data:last -[localhost tests]$ ./lvqtest --mode browse -Receiving Data:key1.0x7fff346ae440 -Receiving Data:key2.0x7fff346ae440 -Receiving Data:key3.0x7fff346ae440 -Receiving Data:last -[localhost tests]$ ./lvqtest --mode write -Sending Data: key1=key1.0x7fff606583e0 -Sending Data: key2=key2.0x7fff606583e0 -Sending Data: key3=key3.0x7fff606583e0 -Sending Data: key1=key1.0x7fff606583e0 -Sending Data: last=last -[localhost tests]$ ./lvqtest --mode consume -Receiving Data:key1.0x7fff606583e0 -Receiving Data:key2.0x7fff606583e0 -Receiving Data:key3.0x7fff606583e0 -Receiving Data:last -[localhost tests]$ - -</programlisting> - <!--h2--></section> - <section role="h2" id="LVQ-Examplesource"><title> - LVQ Program Example - </title> - - <programlisting> - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -#include <qpid/client/AsyncSession.h> -#include <qpid/client/Connection.h> -#include <qpid/client/SubscriptionManager.h> -#include <qpid/client/Session.h> -#include <qpid/client/Message.h> -#include <qpid/client/MessageListener.h> -#include <qpid/client/QueueOptions.h> - -#include <iostream> - -using namespace qpid::client; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace qpid; -using namespace std; - - -enum Mode { CREATE_LVQ, CREATE_LVQ_NO_BROWSE, WRITE, BROWSE, CONSUME}; -const char* modeNames[] = { "create_lvq","create_lvq_no_browse","write","browse","consume" }; - -// istream/ostream ops so Options can read/display Mode. -istream& operator>>(istream& in, Mode& mode) { - string s; - in >> s; - int i = find(modeNames, modeNames+5, s) - modeNames; - if (i >= 5) throw Exception("Invalid mode: "+s); - mode = Mode(i); - return in; -} - -ostream& operator<<(ostream& out, Mode mode) { - return out << modeNames[mode]; -} - -struct Args : public qpid::Options, - public qpid::client::ConnectionSettings -{ - bool help; - Mode mode; - - Args() : qpid::Options("Simple latency test optins"), help(false), mode(BROWSE) - { - using namespace qpid; - addOptions() - ("help", optValue(help), "Print this usage statement") - ("broker,b", optValue(host, "HOST"), "Broker host to connect to") - ("port,p", optValue(port, "PORT"), "Broker port to connect to") - ("username", optValue(username, "USER"), "user name for broker log in.") - ("password", optValue(password, "PASSWORD"), "password for broker log in.") - ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") - ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay") - ("mode", optValue(mode, "'see below'"), "Action mode." - "\ncreate_lvq: create a new queue of type lvq.\n" - "\ncreate_lvq_no_browse: create a new queue of type lvq with no lvq on browse.\n" - "\nwrite: write a bunch of data & keys.\n" - "\nbrowse: browse the queue.\n" - "\nconsume: consume from the queue.\n"); - } -}; - -class Listener : public MessageListener -{ - private: - Session session; - SubscriptionManager subscriptions; - std::string queue; - Message request; - QueueOptions args; - public: - Listener(Session& session); - void setup(bool browse); - void send(std::string kv); - void received(Message& message); - void browse(); - void consume(); -}; - -Listener::Listener(Session& s) : - session(s), subscriptions(s), - queue("LVQtester") -{} - -void Listener::setup(bool browse) -{ - // set queue mode - args.setOrdering(browse?LVQ_NO_BROWSE:LVQ); - - session.queueDeclare(arg::queue=queue, arg::exclusive=false, arg::autoDelete=false, arg::arguments=args); - -} - -void Listener::browse() -{ - subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE, ACQUIRE_MODE_NOT_ACQUIRED)); - subscriptions.run(); -} - -void Listener::consume() -{ - subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED)); - subscriptions.run(); -} - -void Listener::send(std::string kv) -{ - request.getDeliveryProperties().setRoutingKey(queue); - - std::string key; - args.getLVQKey(key); - request.getHeaders().setString(key, kv); - - std::ostringstream data; - data << kv; - if (kv != "last") data << "." << hex << this; - request.setData(data.str()); - - cout << "Sending Data: " << kv << "=" << data.str() << std::endl; - async(session).messageTransfer(arg::content=request); - -} - -void Listener::received(Message& response) -{ - - cout << "Receiving Data:" << response.getData() << std::endl; -/* if (response.getData() == "last"){ - subscriptions.cancel(queue); - } -*/ -} - -int main(int argc, char** argv) -{ - Args opts; - opts.parse(argc, argv); - - if (opts.help) { - std::cout << opts << std::endl; - return 0; - } - - Connection connection; - try { - connection.open(opts); - Session session = connection.newSession(); - Listener listener(session); - - switch (opts.mode) - { - case CONSUME: - listener.consume(); - break; - case BROWSE: - listener.browse(); - break; - case CREATE_LVQ: - listener.setup(false); - break; - case CREATE_LVQ_NO_BROWSE: - listener.setup(true); - break; - case WRITE: - listener.send("key1"); - listener.send("key2"); - listener.send("key3"); - listener.send("key1"); - listener.send("last"); - break; - } - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; -} - -</programlisting> -<!--h2--></section> + </listitem> + </itemizedlist> + </section> + </section> + + <section role="h2" id="LVQ-Creating"> + <title>Creating a Last Value Queue</title> + <section role="h3" id="LVQ-Creating-Address"> + <title>Using Addressing Syntax</title> + <para> + A LVQ may be created using directives in the API's address syntax. + The important argument is "qpid.last_value_queue_key". The following + Python example shows how a producer of stock price updates can create + a LVQ to hold the latest stock prices for each ticker symbol. The + message header used to hold the ticker symbol is called "ticker". + </para> + <programlisting> + conn = Connection(url) + conn.open() + sess = conn.session() + tx = sess.sender("prices;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key':'ticker'}}}}") + </programlisting> + </section> + <section role="h3" id="LVQ-Creating-Tool"> + <title>Using qpid-config</title> + <para> + The same LVQ as shown in the previous example can be created using the + qpid-config utility: + </para> + <programlisting> + $ qpid-config add queue prices --argument qpid.last_value_queue_key=ticker + </programlisting> + </section> + </section> + + <section role="h2" id="LVQ-Example"> + <title>LVQ Example</title> + + <section role="h3" id="LVQ-Example-Sender"> + <title>LVQ Sender</title> + <programlisting> + from qpid.messaging import Connection, Message + + def send(sender, key, message): + message.properties["key"] = key + sender.send(message) + + conn = Connection("localhost") + conn.open() + sess = conn.session() + tx = sess.sender("topic;{create:always, node:{type:queue,x-declare:{arguments:{'qpid.last_value_queue_key':key}}}}") + + msg = Message("Content") + send(tx, "key1", msg); + send(tx, "key2", msg); + send(tx, "key3", msg); + send(tx, "key4", msg); + send(tx, "key2", msg); + send(tx, "key1", msg); + + conn.close() + </programlisting> + </section> + + <section role="h3" id="LVQ-Example-Receiver"> + <title>LVQ Browsing Receiver</title> + <programlisting> + from qpid.messaging import Connection, Message + from time import sleep + + conn = Connection("localhost") + conn.open() + sess = conn.session() + rx = sess.receiver("topic;{mode:browse}") + + while True: + msg = rx.fetch() + sess.acknowledge() + print msg + </programlisting> + </section> + </section> + + <section role="h2" id="LVQ-Deprecated"> + <title>Deprecated LVQ Modes</title> + <para> + There are two legacy modes (still implemented as of Qpid 0.14) + controlled by the qpid.last_value_queue and + qpid.last_value_queue_no_browse argument values. These modes are + intended to be deprecated and should not be used. + </para> + </section> </section> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org