Hey,

When I disconnected and closed down a producer. And afterwards I'm trying to
reconnect, the cpp-library crashes.  I don't know what I'm doing wrong.

See the code below (choose option R)



#include <iostream>
#include <stdio.h>
#include <string.h>

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>

using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

using namespace std;

class SimpleProducer: public ExceptionListener, public
DefaultTransportListener {
private:

        Connection* connection;
        Session* session;
        Destination* destination;
        MessageProducer* producer;
        bool useTopic;
        bool clientAck;
        std::string brokerURI;
        std::string destURI;
        bool isClosed;

public:
        SimpleProducer(const std::string& brokerURI, const std::string& destURI,
bool useTopic, bool clientAck) {
                activemq::library::ActiveMQCPP::initializeLibrary();
                this->connection = NULL;
                this->session = NULL;
                this->destination = NULL;
                this->producer = NULL;
                this->useTopic = useTopic;
                this->brokerURI = brokerURI;
                this->destURI = destURI;
                this->clientAck = clientAck;
                this->isClosed = true;
        }

        virtual ~SimpleProducer() {
                close();
        }

        void close() {
                if (!isClosed) {
                        isClosed = true;
                        this->cleanup();
                }
        }

        virtual void connect() {
                try {
                        try {
                                this->isClosed = false;
                                // Create a ConnectionFactory
                                ActiveMQConnectionFactory* connectionFactory = 
new
ActiveMQConnectionFactory(this->brokerURI);
                                // Create a Connection
                                this->connection = 
connectionFactory->createConnection();
                                delete connectionFactory;
                                ActiveMQConnection* amqConnection = 
dynamic_cast<ActiveMQConnection*>
(connection);
                                if (amqConnection != NULL) {
                                        
amqConnection->addTransportListener(this);
                                }
                                //This will crash the 2nd time!:
                                this->connection->start();
                                this->connection->setExceptionListener(this);

                        } catch (Exception& e) {
                                throw e;
                        }

                        // Create a Session
                        if (this->clientAck) {
                                this->session = 
connection->createSession(Session::CLIENT_ACKNOWLEDGE);
                        } else {
                                this->session = 
connection->createSession(Session::AUTO_ACKNOWLEDGE);
                        }

                        // Create the destination (Topic or Queue)
                        if (this->useTopic) {
                                this->destination = 
this->session->createTopic(this->destURI);
                        } else {
                                this->destination = 
this->session->createQueue(this->destURI);
                        }

                        // Create a MessageProducer from the Session to the 
Topic or Queue
                        this->producer = 
this->session->createProducer(this->destination);

                        
this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);

                        fprintf(stdout, "Producer connected to %s", 
this->brokerURI.c_str());
                        fflush(stdout);

                } catch (CMSException& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
        }

        virtual void send(const char *msg) {
                try {

                        string text(msg);
                        TextMessage* message = 
this->session->createTextMessage(text);

                        char logMessage[256];

                        this->producer->send(message);

                        //logging:
                        sprintf(logMessage, "Message '%s' send to serviceBus", 
msg);

                        //end logging
                        delete message;
                } catch (CMSException& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
        }
        //Exception Listener
        virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
                fprintf(stdout, "!! CMS lib Exception occured !!");
                fflush(stdout);
        }
        //Transport Listener
        virtual void transportInterrupted() {
                fprintf(stdout, "The Connection's Transport has been 
Interrupted.");
                fflush(stdout);
        }
        virtual void transportResumed() {
                fprintf(stdout, "The Connection's Transport has been 
Restored.");
                fflush(stdout);
        }
private:

        void cleanup() {
                //Producer
                try {
                        if (producer != NULL) {
                                producer->close();
                                delete producer;
                        }
                } catch (CMSException& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);

                }
                producer = NULL;

                // Destination
                try {
                        if (destination != NULL) {
                                delete destination;
                        }
                } catch (CMSException& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
                destination = NULL;

                //SESSION
                try {
                        if (session != NULL) {
                                session->close();
                                delete session;
                        }
                } catch (Exception& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
                session = NULL;

                //CONNECTION
                try {
                        if (connection != NULL) {
                                connection->close();
                                delete connection;
                        }
                        connection = NULL;
                } catch (Exception& e) {
                        fprintf(stdout, e.getMessage().c_str());
                        fflush(stdout);
                }
                connection = NULL;
                activemq::library::ActiveMQCPP::shutdownLibrary();
        }

};//class SimpleProducer


SimpleProducer *amq_producer;
void connect(void) {
        std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
        std::string _destURI = "TEST.FOO";
        amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
        amq_producer->connect();
}

void disconnect(void) {
        if(amq_producer != NULL){
                amq_producer->close();
        }
        delete amq_producer;
}
void send(void) {
        amq_producer->send("TEST MESSAGE");
}

int main(int argc, const char* argv[]) {
        string str;
        connect();
        while (str.compare("Q") != 0) {
                printf("\n  P  = to produce\n  R = to reset Producer\n  Q  = 
quit\n");
                getline(cin, str);

                if (str.compare("P") == 0) {
                        send();
                } else if (str.compare("R") == 0) {
                        disconnect();
                        connect();
                }
        }
        disconnect();
        return 0;
}



-- 
View this message in context: 
http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to