Hey,
When I disconnected and closed down a producer. And afterwards I'm trying to
reconnect, the cpp-library crashes. I don't know what I'm doing wrong.
See the code below (choose option R)
#include <iostream>
#include <stdio.h>
#include <string.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
using namespace std;
class SimpleProducer: public ExceptionListener, public
DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
std::string brokerURI;
std::string destURI;
bool isClosed;
public:
SimpleProducer(const std::string& brokerURI, const std::string& destURI,
bool useTopic, bool clientAck) {
activemq::library::ActiveMQCPP::initializeLibrary();
this->connection = NULL;
this->session = NULL;
this->destination = NULL;
this->producer = NULL;
this->useTopic = useTopic;
this->brokerURI = brokerURI;
this->destURI = destURI;
this->clientAck = clientAck;
this->isClosed = true;
}
virtual ~SimpleProducer() {
close();
}
void close() {
if (!isClosed) {
isClosed = true;
this->cleanup();
}
}
virtual void connect() {
try {
try {
this->isClosed = false;
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new
ActiveMQConnectionFactory(this->brokerURI);
// Create a Connection
this->connection =
connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection =
dynamic_cast<ActiveMQConnection*>
(connection);
if (amqConnection != NULL) {
amqConnection->addTransportListener(this);
}
//This will crash the 2nd time!:
this->connection->start();
this->connection->setExceptionListener(this);
} catch (Exception& e) {
throw e;
}
// Create a Session
if (this->clientAck) {
this->session =
connection->createSession(Session::CLIENT_ACKNOWLEDGE);
} else {
this->session =
connection->createSession(Session::AUTO_ACKNOWLEDGE);
}
// Create the destination (Topic or Queue)
if (this->useTopic) {
this->destination =
this->session->createTopic(this->destURI);
} else {
this->destination =
this->session->createQueue(this->destURI);
}
// Create a MessageProducer from the Session to the
Topic or Queue
this->producer =
this->session->createProducer(this->destination);
this->producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
fprintf(stdout, "Producer connected to %s",
this->brokerURI.c_str());
fflush(stdout);
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
}
virtual void send(const char *msg) {
try {
string text(msg);
TextMessage* message =
this->session->createTextMessage(text);
char logMessage[256];
this->producer->send(message);
//logging:
sprintf(logMessage, "Message '%s' send to serviceBus",
msg);
//end logging
delete message;
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
}
//Exception Listener
virtual void onException(const CMSException& ex AMQCPP_UNUSED ) {
fprintf(stdout, "!! CMS lib Exception occured !!");
fflush(stdout);
}
//Transport Listener
virtual void transportInterrupted() {
fprintf(stdout, "The Connection's Transport has been
Interrupted.");
fflush(stdout);
}
virtual void transportResumed() {
fprintf(stdout, "The Connection's Transport has been
Restored.");
fflush(stdout);
}
private:
void cleanup() {
//Producer
try {
if (producer != NULL) {
producer->close();
delete producer;
}
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
producer = NULL;
// Destination
try {
if (destination != NULL) {
delete destination;
}
} catch (CMSException& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
destination = NULL;
//SESSION
try {
if (session != NULL) {
session->close();
delete session;
}
} catch (Exception& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
session = NULL;
//CONNECTION
try {
if (connection != NULL) {
connection->close();
delete connection;
}
connection = NULL;
} catch (Exception& e) {
fprintf(stdout, e.getMessage().c_str());
fflush(stdout);
}
connection = NULL;
activemq::library::ActiveMQCPP::shutdownLibrary();
}
};//class SimpleProducer
SimpleProducer *amq_producer;
void connect(void) {
std::string _brokerURI = "failover:tcp://10.1.1.9:61616";
std::string _destURI = "TEST.FOO";
amq_producer = new SimpleProducer(_brokerURI, _destURI, true, true);
amq_producer->connect();
}
void disconnect(void) {
if(amq_producer != NULL){
amq_producer->close();
}
delete amq_producer;
}
void send(void) {
amq_producer->send("TEST MESSAGE");
}
int main(int argc, const char* argv[]) {
string str;
connect();
while (str.compare("Q") != 0) {
printf("\n P = to produce\n R = to reset Producer\n Q =
quit\n");
getline(cin, str);
if (str.compare("P") == 0) {
send();
} else if (str.compare("R") == 0) {
disconnect();
connect();
}
}
disconnect();
return 0;
}
--
View this message in context:
http://activemq.2283324.n4.nabble.com/activemq-cpp-library-3-2-2-reconnect-fails-tp3046566p3046566.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.