"retroactive consumers",
follow code,just I want.
queue = new ActiveMQQueue("TEST.QUEUE?consumer.retroactive=true");
consumer = session.createConsumer(queue);Thanks a lot. Adrian Co wrote: > > I do not understand what you want. If you want to send first, then still > receive later using topics, you could try using retroactive consumers: > http://www.activemq.org/site/retroactive-consumer.html or create durable > subscribers for more reliable messaging, > > sgliu wrote: >> Send first,then receive.In topic.setCMSExpiration() function? >> What can I do? >> >> >> >> Adrian Co wrote: >> >>> Try subscribing the consumers first before sending messages. >>> >>> int main(int argc, char* argv[]) >>> { >>> Consumer1(); >>> Consumer2(); >>> >>> Produce(); >>> >>> } >>> >>> >>> >>> sgliu wrote: >>> >>>> Please help me. >>>> >>>> >>>> sgliu wrote: >>>> >>>> >>>>> I wish I producer message A, and then I exit the producer program. >>>>> Then >>>>> I >>>>> start two consumer program(one is C1,the other is C2) at same time.C1 >>>>> can >>>>> receive A , C2 can receive A. >>>>> >>>>> #include <activemq/concurrent/Thread.h> >>>>> #include <activemq/concurrent/Runnable.h> >>>>> #include <activemq/core/ActiveMQConnectionFactory.h> >>>>> #include <activemq/util/Integer.h> >>>>> #include <cms/Connection.h> >>>>> #include <cms/Session.h> >>>>> #include <cms/TextMessage.h> >>>>> #include <cms/ExceptionListener.h> >>>>> #include <cms/MessageListener.h> >>>>> #include <stdlib.h> >>>>> >>>>> using namespace activemq::core; >>>>> using namespace activemq::util; >>>>> using namespace activemq::concurrent; >>>>> using namespace cms; >>>>> using namespace std; >>>>> >>>>> class HelloWorldProducer : public Runnable { >>>>> private: >>>>> >>>>> Connection* connection; >>>>> Session* session; >>>>> Topic* destination; >>>>> MessageProducer* producer; >>>>> int numMessages; >>>>> >>>>> public: >>>>> >>>>> HelloWorldProducer( int numMessages ){ >>>>> connection = NULL; >>>>> session = NULL; >>>>> destination = NULL; >>>>> producer = NULL; >>>>> this->numMessages = numMessages; >>>>> } >>>>> >>>>> virtual ~HelloWorldProducer(){ >>>>> cleanup(); >>>>> } >>>>> >>>>> virtual void run() { >>>>> try { >>>>> string user,passwd,sID; >>>>> user="default"; >>>>> passwd=""; >>>>> sID="lsgID"; >>>>> ActiveMQConnectionFactory* connectionFactory = new >>>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); >>>>> >>>>> connection = >>>>> connectionFactory->createConnection(user,passwd,sID); >>>>> connection->start(); >>>>> >>>>> string sss=connection->getClientId(); >>>>> cout << sss << endl; >>>>> >>>>> session = connection->createSession( >>>>> Session::AUTO_ACKNOWLEDGE >>>>> ); >>>>> destination = session->createTopic( "mytopic" >>>>> ); >>>>> >>>>> producer = session->createProducer( >>>>> destination >>>>> ); >>>>> producer->setDeliveryMode( DeliveryMode::PERSISTANT ); >>>>> >>>>> producer->setTimeToLive(100000000); >>>>> string threadIdStr = Integer::toString( Thread::getId() ); >>>>> >>>>> // Create a messages >>>>> string text = (string)"Hello world! from thread " + >>>>> threadIdStr; >>>>> >>>>> for( int ix=0; ix<numMessages; ++ix ){ >>>>> TextMessage* message = session->createTextMessage( >>>>> text ); >>>>> >>>>> string messageID="messageID"; >>>>> >>>>> message->setCMSExpiration(10000000000); >>>>> message->setCMSMessageId(messageID); >>>>> >>>>> // Tell the producer to send the message >>>>> printf( "Sent message from thread %s\n", >>>>> threadIdStr.c_str() >>>>> ); >>>>> producer->send( message ); >>>>> >>>>> delete message; >>>>> } >>>>> >>>>> }catch ( CMSException& e ) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> private: >>>>> >>>>> void cleanup(){ >>>>> >>>>> // Destroy resources. >>>>> try{ >>>>> if( destination != NULL ) delete destination; >>>>> }catch ( CMSException& e ) {} >>>>> destination = NULL; >>>>> >>>>> try{ >>>>> if( producer != NULL ) delete producer; >>>>> }catch ( CMSException& e ) {} >>>>> producer = NULL; >>>>> >>>>> // Close open resources. >>>>> try{ >>>>> if( session != NULL ) session->close(); >>>>> if( connection != NULL ) connection->close(); >>>>> }catch ( CMSException& e ) {} >>>>> >>>>> try{ >>>>> if( session != NULL ) delete session; >>>>> }catch ( CMSException& e ) {} >>>>> session = NULL; >>>>> >>>>> try{ >>>>> if( connection != NULL ) delete connection; >>>>> }catch ( CMSException& e ) {} >>>>> connection = NULL; >>>>> } >>>>> }; >>>>> >>>>> class HelloWorldConsumer : public ExceptionListener, >>>>> public MessageListener, >>>>> public Runnable { >>>>> >>>>> private: >>>>> >>>>> Connection* connection; >>>>> Session* session; >>>>> Topic* destination; >>>>> MessageConsumer* consumer; >>>>> long waitMillis; >>>>> >>>>> public: >>>>> >>>>> HelloWorldConsumer( long waitMillis ){ >>>>> connection = NULL; >>>>> session = NULL; >>>>> destination = NULL; >>>>> consumer = NULL; >>>>> this->waitMillis = waitMillis; >>>>> } >>>>> virtual ~HelloWorldConsumer(){ >>>>> cleanup(); >>>>> } >>>>> >>>>> virtual void run() { >>>>> >>>>> try { >>>>> >>>>> string user,passwd,sID; >>>>> user="default"; >>>>> passwd=""; >>>>> sID="lsgID"; >>>>> // Create a ConnectionFactory >>>>> ActiveMQConnectionFactory* connectionFactory = >>>>> new ActiveMQConnectionFactory( >>>>> "tcp://localhost:61613",user,passwd,sID); >>>>> >>>>> // Create a Connection >>>>> connection = >>>>> connectionFactory->createConnection();//user,passwd,sID); >>>>> delete connectionFactory; >>>>> connection->start(); >>>>> >>>>> connection->setExceptionListener(this); >>>>> >>>>> // Create a Session >>>>> session = connection->createSession( >>>>> Session::AUTO_ACKNOWLEDGE >>>>> ); >>>>> destination = session->createTopic( "mytopic" >>>>> ); >>>>> consumer = session->createDurableConsumer( >>>>> destination , user , "",false); >>>>> >>>>> consumer->setMessageListener( this ); >>>>> >>>>> Thread::sleep( waitMillis ); >>>>> >>>>> } catch (CMSException& e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> virtual void onMessage( const Message* message ){ >>>>> >>>>> try >>>>> { >>>>> const TextMessage* textMessage = >>>>> dynamic_cast< const TextMessage* >( message ); >>>>> string text = textMessage->getText(); >>>>> printf( "Received: %s\n", text.c_str() ); >>>>> } catch (CMSException& e) { >>>>> e.printStackTrace(); >>>>> } >>>>> } >>>>> >>>>> virtual void onException( const CMSException& ex ) { >>>>> printf("JMS Exception occured. Shutting down client.\n"); >>>>> } >>>>> >>>>> private: >>>>> >>>>> void cleanup(){ >>>>> >>>>> // Destroy resources. >>>>> try{ >>>>> if( destination != NULL ) delete destination; >>>>> }catch (CMSException& e) {} >>>>> destination = NULL; >>>>> >>>>> try{ >>>>> if( consumer != NULL ) delete consumer; >>>>> }catch (CMSException& e) {} >>>>> consumer = NULL; >>>>> >>>>> // Close open resources. >>>>> try{ >>>>> if( session != NULL ) session->close(); >>>>> if( connection != NULL ) connection->close(); >>>>> }catch (CMSException& e) {} >>>>> >>>>> try{ >>>>> if( session != NULL ) delete session; >>>>> }catch (CMSException& e) {} >>>>> session = NULL; >>>>> >>>>> try{ >>>>> if( connection != NULL ) delete connection; >>>>> }catch (CMSException& e) {} >>>>> connection = NULL; >>>>> } >>>>> }; >>>>> void Produce() >>>>> { >>>>> HelloWorldProducer producer( 2 ); >>>>> Thread producerThread( &producer ); >>>>> producerThread.start(); >>>>> producerThread.join(); >>>>> } >>>>> void Consumer1() >>>>> { >>>>> HelloWorldConsumer consumer( 10000 ); >>>>> Thread consumerThread( &consumer ); >>>>> consumerThread.start(); >>>>> consumerThread.join(); >>>>> } >>>>> void Consumer2() >>>>> { >>>>> HelloWorldConsumer consumer( 10000 ); >>>>> Thread consumerThread( &consumer ); >>>>> consumerThread.start(); >>>>> consumerThread.join(); >>>>> } >>>>> int main(int argc, char* argv[]) >>>>> { >>>>> Produce(); >>>>> Consumer1(); >>>>> Consumer2(); >>>>> } >>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> > > > -- View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7535583 Sent from the ActiveMQ - User mailing list archive at Nabble.com.
