Failover transport doesn't detect network failures --------------------------------------------------
Key: AMQCPP-246 URL: https://issues.apache.org/activemq/browse/AMQCPP-246 Project: ActiveMQ C++ Client Issue Type: Bug Components: CMS Impl Affects Versions: 3.1 Environment: CMS Consumer running under Linux (Fedora Core 10) Broker and JMS producer running under Linux (Redhat 7) Reporter: Daniel Assignee: Timothy Bish I tested the CMS SimpleAsyncConsumer example to check the failover transport funcionality and it doesn't detect the network failueres, and after network restauration the consumer never receives any message. This is the SimpleAsyncConsumer.cpp code (The same as the sample included in the las CMS 3.1 RC version, but modifying the brokerURI variable ): #include <decaf/lang/Thread.h> #include <decaf/lang/Runnable.h> #include <decaf/util/concurrent/CountDownLatch.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/transport/failover/FailoverTransport.h> #include <activemq/library/ActiveMQCPP.h> #include <decaf/lang/Integer.h> #include <activemq/util/Config.h> #include <decaf/util/Date.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/BytesMessage.h> #include <cms/MapMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <iostream> using namespace activemq; using namespace activemq::core; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace activemq::transport::failover; using namespace cms; using namespace std; //////////////////////////////////////////////////////////////////////////////// class SimpleAsyncConsumer : public ExceptionListener, public MessageListener { private: Connection* connection; Session* session; Destination* destination; MessageConsumer* consumer; bool useTopic; bool clientAck; std::string brokerURI; std::string destURI; public: SimpleAsyncConsumer( const std::string& brokerURI, const std::string& destURI, bool useTopic = false, bool clientAck = false ) { connection = NULL; session = NULL; destination = NULL; consumer = NULL; this->useTopic = useTopic; this->brokerURI = brokerURI; this->destURI = destURI; this->clientAck = clientAck; } virtual ~SimpleAsyncConsumer(){ cleanup(); } void runConsumer() { try { // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( brokerURI ); // Create a Connection connection = connectionFactory->createConnection(); delete connectionFactory; connection->start(); connection->setExceptionListener(this); // Create a Session if( clientAck ) { session = connection->createSession( Session::CLIENT_ACKNOWLEDGE ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( destURI ); } else { destination = session->createQueue( destURI ); } // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer( destination ); consumer->setMessageListener( this ); } catch (CMSException& e) { e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener. virtual void onMessage( const Message* message ){ static int count = 0; try { count++; const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = ""; if( textMessage != NULL ) { text = textMessage->getText(); } else { text = "NOT A TEXTMESSAGE!"; } if( clientAck ) { message->acknowledge(); } printf( "Message #%d Received: %s\n", count, text.c_str() ); } catch (CMSException& e) { e.printStackTrace(); } } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException( const CMSException& ex AMQCPP_UNUSED) { printf("CMS Exception occurred. Shutting down client.\n"); exit(1); } private: void cleanup(){ //************************************************* // Always close destination, consumers and producers before // you destroy their sessions and connection. //************************************************* // Destroy resources. try{ if( destination != NULL ) delete destination; }catch (CMSException& e) { e.printStackTrace(); } destination = NULL; try{ if( consumer != NULL ) delete consumer; }catch (CMSException& e) { e.printStackTrace(); } consumer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch (CMSException& e) { e.printStackTrace(); } // Now Destroy them try{ if( session != NULL ) delete session; }catch (CMSException& e) { e.printStackTrace(); } session = NULL; try{ if( connection != NULL ) delete connection; }catch (CMSException& e) { e.printStackTrace(); } connection = NULL; } }; //////////////////////////////////////////////////////////////////////////////// int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { activemq::library::ActiveMQCPP::initializeLibrary(); std::cout << "=====================================================\n"; std::cout << "Starting the example:" << std::endl; std::cout << "-----------------------------------------------------\n"; // Set the URI to point to the IPAddress of your broker. // add any optional params to the url to enable things like // tightMarshalling or tcp logging etc. See the CMS web site for // a full list of configuration options. // // http://activemq.apache.org/cms/ // // Wire Format Options: // ===================== // Use either stomp or openwire, the default ports are different for each // // Examples: // tcp://127.0.0.1:61616 default to openwire // tcp://127.0.0.1:61616?wireFormat=openwire same as above // tcp://127.0.0.1:61613?wireFormat=stomp use stomp instead // std::string brokerURI = "failover:(tcp://192.168.240.211:61616)"; //============================================================ // This is the Destination Name and URI options. Use this to // customize where the consumer listens, to have the consumer // use a topic or queue set the 'useTopics' flag. //============================================================ std::string destURI = "Monitor"; //?consumer.prefetchSize=1"; //============================================================ // set to true to use topics instead of queues // Note in the code above that this causes createTopic or // createQueue to be used in the consumer. //============================================================ bool useTopics = false; //============================================================ // set to true if you want the consumer to use client ack mode // instead of the default auto ack mode. //============================================================ bool clientAck = false; // Create the consumer SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck ); // Start it up and it will listen forever. consumer.runConsumer(); // Wait to exit. std::cout << "Press 'q' to quit" << std::endl; while( std::cin.get() != 'q') { } std::cout << "-----------------------------------------------------\n"; std::cout << "Finished with the example." << std::endl; std::cout << "=====================================================\n"; activemq::library::ActiveMQCPP::shutdownLibrary(); } -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.