Send first,then receive.In topic.setCMSExpiration() function?
What can I do?
Adrian Co wrote:
>
> Try subscribing the consumers first before sending messages.
>
> int main(int argc, char* argv[])
> {
> Consumer1();
> Consumer2();
>
> Produce();
>
> }
>
>
>
> sgliu wrote:
>> Please help me.
>>
>>
>> sgliu wrote:
>>
>>> I wish I producer message A, and then I exit the producer program. Then
>>> I
>>> start two consumer program(one is C1,the other is C2) at same time.C1
>>> can
>>> receive A , C2 can receive A.
>>>
>>> #include <activemq/concurrent/Thread.h>
>>> #include <activemq/concurrent/Runnable.h>
>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>> #include <activemq/util/Integer.h>
>>> #include <cms/Connection.h>
>>> #include <cms/Session.h>
>>> #include <cms/TextMessage.h>
>>> #include <cms/ExceptionListener.h>
>>> #include <cms/MessageListener.h>
>>> #include <stdlib.h>
>>>
>>> using namespace activemq::core;
>>> using namespace activemq::util;
>>> using namespace activemq::concurrent;
>>> using namespace cms;
>>> using namespace std;
>>>
>>> class HelloWorldProducer : public Runnable {
>>> private:
>>>
>>> Connection* connection;
>>> Session* session;
>>> Topic* destination;
>>> MessageProducer* producer;
>>> int numMessages;
>>>
>>> public:
>>>
>>> HelloWorldProducer( int numMessages ){
>>> connection = NULL;
>>> session = NULL;
>>> destination = NULL;
>>> producer = NULL;
>>> this->numMessages = numMessages;
>>> }
>>>
>>> virtual ~HelloWorldProducer(){
>>> cleanup();
>>> }
>>>
>>> virtual void run() {
>>> try {
>>> string user,passwd,sID;
>>> user="default";
>>> passwd="";
>>> sID="lsgID";
>>> ActiveMQConnectionFactory* connectionFactory = new
>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>>
>>> connection =
>>> connectionFactory->createConnection(user,passwd,sID);
>>> connection->start();
>>>
>>> string sss=connection->getClientId();
>>> cout << sss << endl;
>>>
>>> session = connection->createSession(
>>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> destination = session->createTopic( "mytopic" );
>>>
>>> producer = session->createProducer( destination
>>> );
>>> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>>
>>> producer->setTimeToLive(100000000);
>>> string threadIdStr = Integer::toString( Thread::getId() );
>>>
>>> // Create a messages
>>> string text = (string)"Hello world! from thread " +
>>> threadIdStr;
>>>
>>> for( int ix=0; ix<numMessages; ++ix ){
>>> TextMessage* message = session->createTextMessage(
>>> text );
>>>
>>> string messageID="messageID";
>>> message->setCMSExpiration(10000000000);
>>> message->setCMSMessageId(messageID);
>>>
>>> // Tell the producer to send the message
>>> printf( "Sent message from thread %s\n", threadIdStr.c_str()
>>> );
>>> producer->send( message );
>>>
>>> delete message;
>>> }
>>>
>>> }catch ( CMSException& e ) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> private:
>>>
>>> void cleanup(){
>>>
>>> // Destroy resources.
>>> try{
>>> if( destination != NULL ) delete destination;
>>> }catch ( CMSException& e ) {}
>>> destination = NULL;
>>>
>>> try{
>>> if( producer != NULL ) delete producer;
>>> }catch ( CMSException& e ) {}
>>> producer = NULL;
>>>
>>> // Close open resources.
>>> try{
>>> if( session != NULL ) session->close();
>>> if( connection != NULL ) connection->close();
>>> }catch ( CMSException& e ) {}
>>>
>>> try{
>>> if( session != NULL ) delete session;
>>> }catch ( CMSException& e ) {}
>>> session = NULL;
>>>
>>> try{
>>> if( connection != NULL ) delete connection;
>>> }catch ( CMSException& e ) {}
>>> connection = NULL;
>>> }
>>> };
>>>
>>> class HelloWorldConsumer : public ExceptionListener,
>>> public MessageListener,
>>> public Runnable {
>>>
>>> private:
>>>
>>> Connection* connection;
>>> Session* session;
>>> Topic* destination;
>>> MessageConsumer* consumer;
>>> long waitMillis;
>>>
>>> public:
>>>
>>> HelloWorldConsumer( long waitMillis ){
>>> connection = NULL;
>>> session = NULL;
>>> destination = NULL;
>>> consumer = NULL;
>>> this->waitMillis = waitMillis;
>>> }
>>> virtual ~HelloWorldConsumer(){
>>> cleanup();
>>> }
>>>
>>> virtual void run() {
>>>
>>> try {
>>>
>>> string user,passwd,sID;
>>> user="default";
>>> passwd="";
>>> sID="lsgID";
>>> // Create a ConnectionFactory
>>> ActiveMQConnectionFactory* connectionFactory =
>>> new ActiveMQConnectionFactory(
>>> "tcp://localhost:61613",user,passwd,sID);
>>>
>>> // Create a Connection
>>> connection =
>>> connectionFactory->createConnection();//user,passwd,sID);
>>> delete connectionFactory;
>>> connection->start();
>>>
>>> connection->setExceptionListener(this);
>>>
>>> // Create a Session
>>> session = connection->createSession(
>>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> destination = session->createTopic( "mytopic" );
>>> consumer = session->createDurableConsumer(
>>> destination , user , "",false);
>>>
>>> consumer->setMessageListener( this );
>>>
>>> Thread::sleep( waitMillis );
>>>
>>> } catch (CMSException& e) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> virtual void onMessage( const Message* message ){
>>>
>>> try
>>> {
>>> const TextMessage* textMessage =
>>> dynamic_cast< const TextMessage* >( message );
>>> string text = textMessage->getText();
>>> printf( "Received: %s\n", text.c_str() );
>>> } catch (CMSException& e) {
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> virtual void onException( const CMSException& ex ) {
>>> printf("JMS Exception occured. Shutting down client.\n");
>>> }
>>>
>>> private:
>>>
>>> void cleanup(){
>>>
>>> // Destroy resources.
>>> try{
>>> if( destination != NULL ) delete destination;
>>> }catch (CMSException& e) {}
>>> destination = NULL;
>>>
>>> try{
>>> if( consumer != NULL ) delete consumer;
>>> }catch (CMSException& e) {}
>>> consumer = NULL;
>>>
>>> // Close open resources.
>>> try{
>>> if( session != NULL ) session->close();
>>> if( connection != NULL ) connection->close();
>>> }catch (CMSException& e) {}
>>>
>>> try{
>>> if( session != NULL ) delete session;
>>> }catch (CMSException& e) {}
>>> session = NULL;
>>>
>>> try{
>>> if( connection != NULL ) delete connection;
>>> }catch (CMSException& e) {}
>>> connection = NULL;
>>> }
>>> };
>>> void Produce()
>>> {
>>> HelloWorldProducer producer( 2 );
>>> Thread producerThread( &producer );
>>> producerThread.start();
>>> producerThread.join();
>>> }
>>> void Consumer1()
>>> {
>>> HelloWorldConsumer consumer( 10000 );
>>> Thread consumerThread( &consumer );
>>> consumerThread.start();
>>> consumerThread.join();
>>> }
>>> void Consumer2()
>>> {
>>> HelloWorldConsumer consumer( 10000 );
>>> Thread consumerThread( &consumer );
>>> consumerThread.start();
>>> consumerThread.join();
>>> }
>>> int main(int argc, char* argv[])
>>> {
>>> Produce();
>>> Consumer1();
>>> Consumer2();
>>> }
>>>
>>>
>>
>>
>
>
>
--
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7500819
Sent from the ActiveMQ - User mailing list archive at Nabble.com.