I took exactly the same code from examples, but instead of asynchronous, i
want synchronous client. So all i changed was remove tge listeners and
return the message to runConsumer method.
I keep getting segmentation fault.
Can you advise?
Below is the code I m running:
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class YConsumer{
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
bool useTopic;
std::string brokerURI;
std::string destURI;
bool clientAck;
public:
YConsumer( const std::string& brokerURI,
const std::string& destURI,
bool useTopic = false,
bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
useTopic(useTopic),
brokerURI(brokerURI),
destURI(destURI),
clientAck(clientAck) {
}
virtual ~YConsumer() throw() {
this->cleanup();
}
void close() {
this->cleanup();
}
string runConsumer() {
try {
auto_ptr<ConnectionFactory>
connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
);
connection = connectionFactory->createConnection();
connection->start();
connection->start();
if( clientAck ) {
session = connection->createSession(
Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
}
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
consumer = session->createConsumer( destination );
Message* message = consumer->receive();
TextMessage* textMessage = dynamic_cast<TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
//std::cout<<text.c_str()<<std::endl;
//delete message;
//delete textMessage;
return text.c_str();
} catch (CMSException& e) {
e.printStackTrace();
}
}
private:
void cleanup(){
//*************************************************
// Always close destination, consumers and producers before
// you destroy their sessions and connection.
//*************************************************
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {e.printStackTrace();}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {e.printStackTrace();}
consumer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {e.printStackTrace();}
// Now Destroy them
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {e.printStackTrace();}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {e.printStackTrace();}
connection = NULL;
}
};
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
for(int i=0; i< 100;i++){
activemq::library::ActiveMQCPP::initializeLibrary();
std::string brokerURI =
"failover:(tcp://127.0.0.1:61613?wireFormat=stomp)";
std::string destURI = "TEST.Prototype";
bool useTopics = false;
bool clientAck = false;
YConsumer consumer( brokerURI, destURI, useTopics, clientAck );
string response = consumer.runConsumer();
std::cout<< response<<std::endl;
consumer.close();
//activemq::library::ActiveMQCPP::shutdownLibrary();
}
}
--
View this message in context:
http://activemq.2283324.n4.nabble.com/The-Decaf-Threading-API-is-in-a-Shutdown-State-Exception-tp4077689p4077852.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.