Please help me.
sgliu wrote:
>
> I wish I producer message A, and then I exit the producer program. Then I
> start two consumer program(one is C1,the other is C2) at same time.C1 can
> receive A , C2 can receive A.
>
> #include <activemq/concurrent/Thread.h>
> #include <activemq/concurrent/Runnable.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Integer.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
>
> using namespace activemq::core;
> using namespace activemq::util;
> using namespace activemq::concurrent;
> using namespace cms;
> using namespace std;
>
> class HelloWorldProducer : public Runnable {
> private:
>
> Connection* connection;
> Session* session;
> Topic* destination;
> MessageProducer* producer;
> int numMessages;
>
> public:
>
> HelloWorldProducer( int numMessages ){
> connection = NULL;
> session = NULL;
> destination = NULL;
> producer = NULL;
> this->numMessages = numMessages;
> }
>
> virtual ~HelloWorldProducer(){
> cleanup();
> }
>
> virtual void run() {
> try {
> string user,passwd,sID;
> user="default";
> passwd="";
> sID="lsgID";
> ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>
> connection =
> connectionFactory->createConnection(user,passwd,sID);
> connection->start();
>
> string sss=connection->getClientId();
> cout << sss << endl;
>
> session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
> destination = session->createTopic( "mytopic" );
>
> producer = session->createProducer( destination );
> producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>
> producer->setTimeToLive(100000000);
> string threadIdStr = Integer::toString( Thread::getId() );
>
> // Create a messages
> string text = (string)"Hello world! from thread " +
> threadIdStr;
>
> for( int ix=0; ix<numMessages; ++ix ){
> TextMessage* message = session->createTextMessage(
> text );
>
> string messageID="messageID";
> message->setCMSExpiration(10000000000);
> message->setCMSMessageId(messageID);
>
> // Tell the producer to send the message
> printf( "Sent message from thread %s\n", threadIdStr.c_str() );
> producer->send( message );
>
> delete message;
> }
>
> }catch ( CMSException& e ) {
> e.printStackTrace();
> }
> }
>
> private:
>
> void cleanup(){
>
> // Destroy resources.
> try{
> if( destination != NULL ) delete destination;
> }catch ( CMSException& e ) {}
> destination = NULL;
>
> try{
> if( producer != NULL ) delete producer;
> }catch ( CMSException& e ) {}
> producer = NULL;
>
> // Close open resources.
> try{
> if( session != NULL ) session->close();
> if( connection != NULL ) connection->close();
> }catch ( CMSException& e ) {}
>
> try{
> if( session != NULL ) delete session;
> }catch ( CMSException& e ) {}
> session = NULL;
>
> try{
> if( connection != NULL ) delete connection;
> }catch ( CMSException& e ) {}
> connection = NULL;
> }
> };
>
> class HelloWorldConsumer : public ExceptionListener,
> public MessageListener,
> public Runnable {
>
> private:
>
> Connection* connection;
> Session* session;
> Topic* destination;
> MessageConsumer* consumer;
> long waitMillis;
>
> public:
>
> HelloWorldConsumer( long waitMillis ){
> connection = NULL;
> session = NULL;
> destination = NULL;
> consumer = NULL;
> this->waitMillis = waitMillis;
> }
> virtual ~HelloWorldConsumer(){
> cleanup();
> }
>
> virtual void run() {
>
> try {
>
> string user,passwd,sID;
> user="default";
> passwd="";
> sID="lsgID";
> // Create a ConnectionFactory
> ActiveMQConnectionFactory* connectionFactory =
> new ActiveMQConnectionFactory(
> "tcp://localhost:61613",user,passwd,sID);
>
> // Create a Connection
> connection =
> connectionFactory->createConnection();//user,passwd,sID);
> delete connectionFactory;
> connection->start();
>
> connection->setExceptionListener(this);
>
> // Create a Session
> session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
> destination = session->createTopic( "mytopic" );
> consumer = session->createDurableConsumer(
> destination , user , "",false);
>
> consumer->setMessageListener( this );
>
> Thread::sleep( waitMillis );
>
> } catch (CMSException& e) {
> e.printStackTrace();
> }
> }
>
> virtual void onMessage( const Message* message ){
>
> try
> {
> const TextMessage* textMessage =
> dynamic_cast< const TextMessage* >( message );
> string text = textMessage->getText();
> printf( "Received: %s\n", text.c_str() );
> } catch (CMSException& e) {
> e.printStackTrace();
> }
> }
>
> virtual void onException( const CMSException& ex ) {
> printf("JMS Exception occured. Shutting down client.\n");
> }
>
> private:
>
> void cleanup(){
>
> // Destroy resources.
> try{
> if( destination != NULL ) delete destination;
> }catch (CMSException& e) {}
> destination = NULL;
>
> try{
> if( consumer != NULL ) delete consumer;
> }catch (CMSException& e) {}
> consumer = NULL;
>
> // Close open resources.
> try{
> if( session != NULL ) session->close();
> if( connection != NULL ) connection->close();
> }catch (CMSException& e) {}
>
> try{
> if( session != NULL ) delete session;
> }catch (CMSException& e) {}
> session = NULL;
>
> try{
> if( connection != NULL ) delete connection;
> }catch (CMSException& e) {}
> connection = NULL;
> }
> };
> void Produce()
> {
> HelloWorldProducer producer( 2 );
> Thread producerThread( &producer );
> producerThread.start();
> producerThread.join();
> }
> void Consumer1()
> {
> HelloWorldConsumer consumer( 10000 );
> Thread consumerThread( &consumer );
> consumerThread.start();
> consumerThread.join();
> }
> void Consumer2()
> {
> HelloWorldConsumer consumer( 10000 );
> Thread consumerThread( &consumer );
> consumerThread.start();
> consumerThread.join();
> }
> int main(int argc, char* argv[])
> {
> Produce();
> Consumer1();
> Consumer2();
> }
>
--
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7485492
Sent from the ActiveMQ - User mailing list archive at Nabble.com.