Hi
I have created three thread two threads read asynchronoulsy from the queue and
one thread writes to the queue..
below is the code.
class listener : public MesageListener
{
// constructor
// received function;
};
unsigned int __stdcall ReadThreadProc(void *)
{
const char* host = "172.20.112.111"
int port = 5004;
try
{
Connection connection;
ConnectionSettings lcConnSettings;
lcConnSettings.host = host;
lcConnSettings.port = port;
lcConnSettings.maxFrameSize = 2048;
lcConnSettings.username = "guest";
lcConnSettings.password = "guest";
connection.open(lcConnSettings);
Session session = connection.newSession();
QueueOrderingPolicy cQOP = FIFO;
QueueOptions lcQueueOptions;
lcQueueOptions.setOrdering(cQOP);
session.exchangeDeclare(arg::exchange = "QM", arg::exclusive = false;
arg:autoDelete = false; arg::type = "direct", );
session.queueDeclare(arg::queue = "RMS", arg::exclusive = false,
arg::qutoDelete = false, arg::arguments = lcQueueOptions);
session.exchangeBind(arg::exchange = "QM", arg::queue = "RMS",
arg::bindingKey = "QM_ROUTE_KEY");
SubsriptionManager subscriptions(session);
Listener listener(subscriptions);
subscription.subscribe(listener, "RMS");
subscription.start();
Sleep(10000);
subscriptions.stop();
session.close();
connection.close();
}
catch(const exception &error )
{
cout << error.wha() << endl;
}
return 0;
}
unsigned int __stdcall WriteThreadProc(void *)
{
const char* host = "172.20.112.111"
int port = 5004;
try
{
Connection connection;
ConnectionSettings lcConnSettings;
lcConnSettings.host = host;
lcConnSettings.port = port;
lcConnSettings.maxFrameSize = 2048;
lcConnSettings.username = "guest";
lcConnSettings.password = "guest";
connection.open(lcConnSettings);
Session session = connection.newSession();
QueueOrderingPolicy cQOP = FIFO;
QueueOptions lcQueueOptions;
lcQueueOptions.setOrdering(cQOP);
session.exchangeDeclare(arg::exchange = "QM", arg::exclusive = false;
arg:autoDelete = false; arg::type = "direct", );
session.queueDeclare(arg::queue = "RMS", arg::exclusive = false,
arg::qutoDelete = false, arg::arguments = lcQueueOptions);
session.exchangeBind(arg::exchange = "QM", arg::queue = "RMS",
arg::bindingKey = "QM_ROUTE_KEY");
string buffer("Hello from 0676 ");
string lastdata(256, Q);
string target("QM");
string routeKey("QM_ROUTE_KEY");
Message message'
message.getDeliveryProperties().setRouteKey(routeKey);
for(int i = 0; i < 100; ++i)
{
message.setData(buffer);
async(session).messageTransfer(arg::content=message,
arg::destination=target, arg::acceptMode=message::ACCEPT_MODE_NONE,
arg::acquireMode=message::
ACQUIRE_MODE_PRE_ACQUIRED, arg::sync = false);
}
message.setData(lastdata);
async(session).messageTransfer(arg::content=message,
arg::destination=target, arg::acceptMode=message::ACCEPT_MODE_NONE,
arg::acquireMode=message::
ACQUIRE_MODE_PRE_ACQUIRED, arg::sync = false);
session.sync();
session.close();
connection.close()
}
catch(const exception & error)
{
const << error.what();
}
return 0;
}
int main()
{
thread1(readThreadproc)
thread2(writethreadproc);
thread3(readThreadProc);
}
Now the problem is it give exception if u check the call stack then
it show ptr = 0x00000000 line 664
qpid::sys::SystemInfo::getProcessName()
qpid::client::ConnectionHandler::ConnectionHandle()
qpid::client::ConnectionImpl::ConnectionImp()
qpid::client:Connection::open() ( open connection is from read thread)
tested the above code on windows
Awaiting your reply.
Regards,
Nithesh