Yep, something is definitely not right. Please create a JIRA issue here
http://issues.apache.org/activemq/browse/AMQCPP
... be sure to attach the code and your output image.
Thanks!
Nate
On 1/30/07, Lalit Nagpal <[EMAIL PROTECTED]> wrote:
Copy pasting the entire sample main code to reproduce my issue
#include "stdafx.h"
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
using namespace activemq::core;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;
bool GenerateGuid(char* buf, size_t strsz)
{
GUID a_guid;
UuidCreate(&a_guid);
if (strsz > 38)
{
sprintf(buf,
"{%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X}",
a_guid.Data1, a_guid.Data2, a_guid.Data3,
a_guid.Data4[0], a_guid.Data4[1],
a_guid.Data4[2], a_guid.Data4[3],
a_guid.Data4[4], a_guid.Data4[5],
a_guid.Data4[6], a_guid.Data4[7]
);
return true;
}
return false;
};
class HelloWorldProducer : public Runnable, public MessageListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
MessageConsumer* consumer;
int numMessages;
public:
HelloWorldProducer( int numMessages ){
connection = NULL;
session = NULL;
destination = NULL;
producer = NULL;
this->numMessages = numMessages;
}
virtual void onMessage( const Message* message ){
try
{
const TextMessage* textMessage = dynamic_cast< const
TextMessage*
>( message );
string text = textMessage->getText();
printf( "Producer Received: %s\n", text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual ~HelloWorldProducer(){
cleanup();
}
virtual void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61617");
// Create a Connection
connection = connectionFactory->createConnection();
connection->start();
// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
// Create the destination (Topic or Queue)
destination = session->createTopic( "TEST.FOO" );
// Create a MessageProducer from the Session to the Topic or
Queue
cout << endl << "Producer has been registered at "
<<
destination->toString() << endl << endl;
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
// Lets have a reply-back channel to this producer
now
char* charGuid = new char[40];
GenerateGuid(charGuid, 39);
cms::Topic* replyTopic =
session->createTopic(charGuid);
consumer = session->createConsumer( replyTopic );
consumer->setMessageListener(this);
cout << "Reply back channel has been registered at
" <<
replyTopic->toString() << endl << endl;
// Stringify the thread id
char threadIdStr[100];
_snprintf(threadIdStr, sizeof(threadIdStr), "%d",
Thread::getId() );
// Create a messages
string text = (string)"Hello world! from thread " +
threadIdStr;
for( int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage(
text );
message->setCMSReplyTo(replyTopic->toProviderString());
// Tell the producer to send the message
printf( "Producer Sent message from thread %s\n",
threadIdStr);
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) {}
destination = NULL;
try{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) {}
producer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) {}
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) {}
connection = NULL;
}
};
class HelloWorldConsumer : public ExceptionListener,
public MessageListener,
public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
long waitMillis;
public:
HelloWorldConsumer( long waitMillis ){
connection = NULL;
session = NULL;
destination = NULL;
consumer = NULL;
this->waitMillis = waitMillis;
}
virtual ~HelloWorldConsumer(){
cleanup();
}
virtual void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory( "tcp://127.0.0.1:61617" );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
connection->start();
connection->setExceptionListener(this);
// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
// Create the destination (Topic or Queue)
destination = session->createTopic( "TEST.FOO" );
// Create a MessageConsumer from the Session to the Topic or
Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
// Sleep while asynchronous messages come in.
Thread::sleep( waitMillis );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onMessage( const Message* message ){
try
{
// display the message received
const TextMessage* textMessage = dynamic_cast< const
TextMessage*
>( message );
string text = textMessage->getText();
printf( "Consumer Received: %s\n", text.c_str() );
// lets reply back with a thanks
if (textMessage->getCMSReplyTo().c_str() &&
strcmp("null",
textMessage->getCMSReplyTo().c_str())==1 ) {
cms::Topic* destination_ =
session->createTopic(
textMessage->getCMSReplyTo().c_str() );
MessageProducer* producer =
session->createProducer( destination_ );
producer->setDeliveryMode(
DeliveryMode::NON_PERSISTANT );
cout << endl << "Consumer Replying back to
" << destination_->toString()
<< endl;
TextMessage* message_ =
session->createTextMessage( "Thank you for Hello
World !!!" );
producer->send(message_);
}
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onException( const CMSException& ex ) {
printf("JMS Exception occured. Shutting down client.\n");
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {}
consumer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {}
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {}
connection = NULL;
}
};
int _tmain(int argc, _TCHAR* argv[])
{
HelloWorldProducer producer( 5 );
HelloWorldConsumer consumer( 1500 );
// Start the consumer thread.
Thread consumerThread( &consumer );
consumerThread.start();
// Start the producer thread.
Thread producerThread( &producer );
producerThread.start();
// Wait for the threads to complete.
producerThread.join();
consumerThread.join();
cout << endl << endl << endl;
return 0;
}
Lalit Nagpal wrote:
>
> Hi
>
> I am using the activemq-cpp cms api 1.0 release. The problem I am facing
> is like this-
> My producer sends a message to the consumer and a message should be sent
> from the receiving end as a reply after this - consider a situation
where
> a loginRequest message has been sent and now a loginReply message should
> be sent from the receiving end.
>
> Attached is a sample main that can reproduce the problem I am facing - I
> have modified the sample helloproducer helloconsumer code available at
> http://activemq.org/site/activemq-cpp-client.html
> to reproduce my problem so that its easier for you to see.
>
> http://www.nabble.com/file/6111/DestProbs.cpp DestProbs.cpp
>
> If you execute this piece of code you will see the output as in the
> attached image file
>
> http://www.nabble.com/file/6110/DestinationProblem.JPG
>
> This is what the code does
> producer lets call it xxx sends a "Hello world! from thread xxxx" to the
> consumer
> consumer lets call it yyy receives the message and displays it
> this is the normal behavior as given in the example on
> http://activemq.org/site/activemq-cpp-client.html
> Following extra needs to be done now
> from yyy a reply should go back to xxx ... for this i registered a
> producer at yyy by creating a topic using the message->getCMSReplyTo()
and
> then replying back to that destination.
>
> The mismatch can be easily see by doing a bstat .... when I created the
> consumer yyy initially it created a topic by name say ABCDEFGH (which is
a
> random id) and later on when I used the message->getCMSReplyTo() to
create
> a topic the topic was registered with the name /topic/ABCDEFGH ..... the
> additional /topic/ that has got added is doing a mess up here and the
> replies from yyy to xxx are not reaching xxx (getting enqueued and not
> dequeued) ...
>
> the /topic/ gets added due to the following statement in
> HelloWorldProducer - run method
> message->setCMSReplyTo(replyTopic->toProviderString());
> here the toProviderString method adds it actually ... if you replace
this
> method with just the toString() method ... you will get a stomp
exception
> saying that destinations should start with either /topic/ or /queue/
>
> Can somebody make this code work please.
>
> For every message sent by the producer to consumer "Hello world! from
> thread xxxx" there should be a reply coming back as "Thank you for Hello
> World !!!"
>
> Please help me urgently here.
>
> Thank you in advance
>
> Lalit Nagpal
> CSA, SunGard
>
>
--
View this message in context:
http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8709155
Sent from the ActiveMQ - User mailing list archive at Nabble.com.