There's nothing special about the put program.
The browse program browses messages from the queue and will acquire and accept every fourth message.

The code works in that if you run the put program 5 times , and then run browse. The first time you run browse there will be 20 messages that are read. The second time the browse will only show 16 messages, although qpid-stat -q will show the queue depth as 20.

//Bill

On 09/09/2009 01:12 PM, Bill Whiting wrote:
On 09/09/2009 12:50 PM, Carl Trieloff wrote:
Gordon Sim wrote:
On 09/09/2009 04:59 PM, Bill Whiting wrote:
Thanks, I'm able to browse messages non-destructively and I can
acquire/accept an individual message and remove it from the queue;
however, qpid-stat -q shows the same number of messages.

Is this a bug in qpid-stat or are those messages waiting for some
timeout before they become available again?

Acquired and accepted messages should be dequeued when the broker gets the accept and would then be reflected in the dequeued count shown by qpid-stat.

note that stats get updated on mgnt interval for tools that subscribe to this info like qpid-tool.

Carl.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscr...@qpid.apache.org


Increases to queue depth show immediately, decreases in queue depth are not showing at all.
If I use qpid-tool -> show connection
It showed the connection for qpid-tool, and the last connection of my test application for several seconds (60+), but now only the current connection shows, however, queue depth has not decreased.

I'll clean up the code and post it.

//Bill

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscr...@qpid.apache.org



/*
 * amqbrowse.cpp
 *
 *  Created on: Sep 8, 2009
 *      Author: whiting
 */

#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>

#include <cstdlib>
#include <iostream>

#include <sstream>

using namespace qpid::client;
using namespace qpid::framing;

using std::stringstream;
using std::string;


int main(int argc, char** argv) {
    const char* host = argc>1 ? argv[1] : "127.0.0.1";
    int port = argc>2 ? atoi(argv[2]) : 5672;
    Connection connection;
    Session session;
    SubscriptionSettings settings;
    LocalQueue local_queue;
    SubscriptionManager *subMgr;
        Subscription subscription;

        try {
                connection.open(host, port);
                session =  connection.newSession();
                subMgr = new SubscriptionManager(session);

                settings.exclusive=false;
                settings.flowControl = FlowControl::messageWindow(20);
                settings.acceptMode = ACCEPT_MODE_NONE;
                settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED;
//!!!!          settings.acquireMode = ACQUIRE_MODE_PRE_ACQUIRED;
                subscription = subMgr->subscribe(local_queue, "RMS", settings, 
"rmsset");
        }
        catch(const std::exception& error) {
                std::cout << error.what() << std::endl;
        }

//      SubscriptionManager subscriptions(session);
//      subscriptions.subscribe(local_queue, "RMS", settings, "rmsset");

        try {
                int ix;
                ix = 0;
                Message message;
                while (true) {
                        if (local_queue.get(message, 
10000*qpid::sys::TIME_MSEC)) { // get with wait 5 sec
                                std::cout << "Response: " << message.getData() 
<< std::endl;
                                ix ++;
                                if (!(ix % 4)) {
                                        SequenceSet toBeAquired;
                                        toBeAquired.add(message.getId());
                                        std::cout << "Acquire and accept the 
last message" << std::endl;
                                        session.messageAcquire(toBeAquired);
                                        session.messageAccept(toBeAquired);
//                                      subscription.acquire(message);
//                                      subscription.accept(message);
                                }
                                session.sendCompletion();
                        }
                        else {
                                break;
                        }
                }
                subscription.cancel();
                std::cout << "The queue is now empty" << std::endl;
                session.close();
        }
        catch(const std::exception& error) {
                std::cout << error.what() << std::endl;
        }

}
/*
 * amqput.cpp
 *
 */

#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/QueueOptions.h>
#include <qpid/client/Message.h>
//#include <qpid/client/SubscriptionManager.h>

#include <cstdio>
#include <cstdlib>
#include <iostream>

#include <sstream>
#include <time.h>

using namespace qpid::client;
using namespace qpid::framing;

using std::stringstream;
using std::string;

int main(int argc, char** argv) {
    const char* host = argc>1 ? argv[1] : "127.0.0.1";
    int port = argc>2 ? atoi(argv[2]) : 5672;
    Connection connection;
    try {
        connection.open(host, port);
        Session session =  connection.newSession();

        //--------- Main body of program 
--------------------------------------------
        QueueOptions queueOptions;
        queueOptions.setOrdering(FIFO);

        session.queueDeclare(arg::queue="RMS", arg::exclusive = false, 
arg::autoDelete = false, arg::arguments  = queueOptions);
        session.exchangeBind(arg:: exchange="QM", arg::queue="RMS", 
arg::bindingKey="QM_ROUTE_KEY");
        Message message;
        message.getDeliveryProperties().setRoutingKey("QM_ROUTE_KEY");
        string buffer(256, 'A');

        Message request;
        request.getDeliveryProperties().setRoutingKey("QM_ROUTE_KEY");
//      request.getMessageProperties().setReplyTo(ReplyTo("RMS", 
response_queue.str()));

        // Now send some requests ...

        string s[] = {
            "Twas brillig, and the slithy toves",
            "Did gire and gymble in the wabe.",
            "All mimsy were the borogroves,",
            "And the mome raths outgrabe.",
                        "Beware the Jabberwock my son,"
        };


        for (int ix=0; ix<5; ix++) {
                char timestring[40];
                std::string messagedata;
  time_t rawtime;
  struct tm * timeinfo;

  time ( &rawtime );
  timeinfo = localtime ( &rawtime );
                        sprintf(timestring, "%04.4d-%02d-%02d %02d:%02d:%02d ", 
(timeinfo->tm_year + 1900),timeinfo->tm_mon, timeinfo->tm_mday, 
timeinfo->tm_hour,timeinfo->tm_min,timeinfo->tm_sec);
                        messagedata=timestring;
                        messagedata+= ":" + s[ix];
            request.setData(messagedata);
                        
            session.messageTransfer(arg::content=request, 
arg::destination="QM");
            std::cout << "Request: " << messagedata << std::endl;
                        sleep(1);
        }

        
//-----------------------------------------------------------------------------

        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
    }
    return 1;
}

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscr...@qpid.apache.org

Reply via email to