Repository: activemq-cpp Updated Branches: refs/heads/master b61e95c47 -> ce523a140
https://issues.apache.org/jira/browse/AMQCPP-589 https://issues.apache.org/jira/browse/AMQCPP-588 Fix NPE error. Update onMessage processing to always incluide the error cause exception so that the poison cause in DLQ contains the original error message. Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/ce523a14 Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/ce523a14 Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/ce523a14 Branch: refs/heads/master Commit: ce523a140112d4d654e8ae660893e09bf9ffd7c3 Parents: b61e95c Author: Timothy Bish <tabish...@gmail.com> Authored: Mon Nov 23 12:15:55 2015 -0500 Committer: Timothy Bish <tabish...@gmail.com> Committed: Mon Nov 30 14:07:39 2015 -0500 ---------------------------------------------------------------------- .../core/kernels/ActiveMQConsumerKernel.cpp | 32 +- activemq-cpp/src/test-integration/Makefile.am | 2 + .../src/test-integration/TestRegistry.cpp | 2 + .../OpenWireMessageListenerRedeliveryTest.cpp | 436 +++++++++++++++++++ .../OpenWireMessageListenerRedeliveryTest.h | 56 +++ 5 files changed, 525 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/ce523a14/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp index 25a08c5..c54cac0 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp @@ -482,6 +482,24 @@ namespace { }; /** + * ActiveMQAckHandler used to support Managed Acknowledge modes. + */ + class NoOpAckHandler : public ActiveMQAckHandler { + private: + + NoOpAckHandler(const NoOpAckHandler&); + NoOpAckHandler& operator=(const NoOpAckHandler&); + + public: + + NoOpAckHandler() { + } + + void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED) { + } + }; + + /** * ActiveMQAckHandler used to support Client Acknowledge mode. */ class ClientAckHandler : public ActiveMQAckHandler { @@ -1520,9 +1538,14 @@ void ActiveMQConsumerKernel::rollback() { Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON, this->internal->deliveredMessages.size())); ack->setFirstMessageId(firstMsgId); + std::string message = "Exceeded RedeliveryPolicy max redelivery limit: " + - Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()) + - " cause: " + lastMsg->getRollbackCause().getMessage(); + Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()); + if (!lastMsg->getRollbackCause().getMessage().empty()) { + message.append(" cause: Exception -> "); + message.append(lastMsg->getRollbackCause().getMessage()); + } + ack->setPoisonCause(internal->createBrokerError(message)); session->sendAck(ack, true); // Adjust the window size. @@ -1622,9 +1645,9 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch) } afterMessageIsConsumed(dispatch, expired); } catch (RuntimeException& e) { + dispatch->setRollbackCause(e); if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) { // Schedule redelivery and possible DLQ processing - dispatch->setRollbackCause(e); rollback(); } else { // Transacted or Client ack: Deliver the next message. @@ -1711,6 +1734,9 @@ Pointer<cms::Message> ActiveMQConsumerKernel::createCMSMessage(Pointer<MessageDi } else if (session->isIndividualAcknowledge()) { Pointer<ActiveMQAckHandler> ackHandler(new IndividualAckHandler(this, dispatch)); message->setAckHandler(ackHandler); + } else { + Pointer<ActiveMQAckHandler> ackHandler(new NoOpAckHandler()); + message->setAckHandler(ackHandler); } return message.dynamicCast<cms::Message>(); http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/ce523a14/activemq-cpp/src/test-integration/Makefile.am ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/Makefile.am b/activemq-cpp/src/test-integration/Makefile.am index 273c4d4..38e31bf 100644 --- a/activemq-cpp/src/test-integration/Makefile.am +++ b/activemq-cpp/src/test-integration/Makefile.am @@ -35,6 +35,7 @@ cc_sources = \ activemq/test/TransactionTest.cpp \ activemq/test/VirtualTopicTest.cpp \ activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.cpp \ + activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp \ activemq/test/openwire/OpenWireRedeliveryPolicyTest.cpp \ activemq/test/openwire/OpenwireAdvisorysTest.cpp \ activemq/test/openwire/OpenwireAsyncSenderTest.cpp \ @@ -98,6 +99,7 @@ h_sources = \ activemq/test/TransactionTest.h \ activemq/test/VirtualTopicTest.h \ activemq/test/openwire/OpenWireCmsSendWithAsyncCallbackTest.h \ + activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h \ activemq/test/openwire/OpenWireRedeliveryPolicyTest.h \ activemq/test/openwire/OpenwireAdvisorysTest.h \ activemq/test/openwire/OpenwireAsyncSenderTest.h \ http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/ce523a14/activemq-cpp/src/test-integration/TestRegistry.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/TestRegistry.cpp b/activemq-cpp/src/test-integration/TestRegistry.cpp index deaf816..5ac2c56 100644 --- a/activemq-cpp/src/test-integration/TestRegistry.cpp +++ b/activemq-cpp/src/test-integration/TestRegistry.cpp @@ -28,6 +28,7 @@ #include "activemq/test/openwire/OpenwireJmsMessageGroupsTest.h" #include "activemq/test/openwire/OpenwireJmsRecoverTest.h" #include "activemq/test/openwire/OpenwireMessageCompressionTest.h" +#include "activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h" #include "activemq/test/openwire/OpenwireMessagePriorityTest.h" #include "activemq/test/openwire/OpenwireMapMessageTest.h" #include "activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h" @@ -68,6 +69,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsMessageGro CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsRecoverTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessageCompressionTest ); +CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireMessageListenerRedeliveryTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriorityTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest ); http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/ce523a14/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp new file mode 100644 index 0000000..8a77c48 --- /dev/null +++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.cpp @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "OpenWireMessageListenerRedeliveryTest.h" + +#include <cms/ConnectionFactory.h> +#include <cms/Connection.h> +#include <cms/Session.h> + +#include <activemq/core/policies/DefaultRedeliveryPolicy.h> +#include <activemq/core/ActiveMQConnectionFactory.h> +#include <activemq/core/ActiveMQConnection.h> + +#include <decaf/lang/Pointer.h> +#include <decaf/lang/Thread.h> +#include <decaf/lang/Runnable.h> +#include <decaf/util/ArrayList.h> +#include <decaf/util/concurrent/TimeUnit.h> +#include <decaf/util/concurrent/CountDownLatch.h> +#include <decaf/util/concurrent/atomic/AtomicInteger.h> + +#include <memory> + +using namespace cms; +using namespace std; +using namespace decaf; +using namespace decaf::lang; +using namespace decaf::lang::exceptions; +using namespace decaf::util; +using namespace decaf::util::concurrent; +using namespace decaf::util::concurrent::atomic; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::test; +using namespace activemq::test::openwire; + +//////////////////////////////////////////////////////////////////////////////// +namespace { + + static std::string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause"; + + cms::Connection* createConnection(const std::string& brokerUri) { + + ActiveMQConnectionFactory factory(brokerUri); + + factory.getRedeliveryPolicy()->setInitialRedeliveryDelay(0); + factory.getRedeliveryPolicy()->setRedeliveryDelay(1000); + factory.getRedeliveryPolicy()->setMaximumRedeliveries(3); + factory.getRedeliveryPolicy()->setBackOffMultiplier((short) 2); + factory.getRedeliveryPolicy()->setUseExponentialBackOff(true); + + return factory.createConnection(); + } + + class TestMessageListener : public cms::MessageListener { + private: + + int counter; + cms::Session* session; + + public: + + TestMessageListener(cms::Session* session) : counter(0), session(session) {} + + int getCounter() { + return counter; + } + + virtual void onMessage(const cms::Message* message) { + + try { + counter++; + + if (counter <= 4) { + session->rollback(); + } else { + message->acknowledge(); + session->commit(); + } + } catch (CMSException& e) { + } + } + }; + + class ExceptionMessageListener : public cms::MessageListener { + private: + + CountDownLatch doneLatch; + ArrayList<std::string> received; + std::string testName; + int maxDeliveries; + int count; + + public: + + ExceptionMessageListener(const std::string& testName, int messageCount, int maxDeliveries) : + doneLatch(messageCount), + received(), + testName(testName), + maxDeliveries(maxDeliveries), + count(0) { + } + + int getCount() { + return count; + } + + ArrayList<std::string>& getReceived() { + return received; + } + + bool await(long long timeout, const TimeUnit& unit) { + return doneLatch.await(timeout, unit); + } + + virtual void onMessage(const cms::Message* message) { + try { + const TextMessage* textMessage = dynamic_cast<const cms::TextMessage*>(message); + received.add(textMessage->getText()); + } catch (cms::CMSException& e) { + e.printStackTrace(); + } + + if (++count < maxDeliveries) { + throw decaf::lang::exceptions::RuntimeException( + __FILE__, __LINE__, testName.append(" forced a redelivery").c_str()); + } + + // new blood + count = 0; + doneLatch.countDown(); + } + }; + + class TrackingMessageListener : public cms::MessageListener { + private: + + CountDownLatch doneLatch; + ArrayList< Pointer<cms::Message> > received; + std::string testName; + int count; + + public: + + TrackingMessageListener(const std::string& testName) : + doneLatch(1), + received(), + testName(testName), + count(0) { + } + + TrackingMessageListener(const std::string& testName, int expected) : + doneLatch(expected), + received(), + testName(testName), + count(0) { + } + + int getCount() { + return received.size(); + } + + ArrayList< Pointer<cms::Message> >& getReceived() { + return received; + } + + bool await(long long timeout, const TimeUnit& unit) { + return doneLatch.await(timeout, unit); + } + + virtual void onMessage(const cms::Message* message) { + try { + Pointer<cms::Message> copy(message->clone()); + received.add(copy); + doneLatch.countDown(); + } catch (cms::CMSException& e) { + e.printStackTrace(); + } + } + }; + + class FailingMessageListener : public cms::MessageListener { + private: + + CountDownLatch doneLatch; + cms::Session* session; + std::string testName; + + public: + + FailingMessageListener(cms::Session* session, const std::string& testName, int expected) : + doneLatch(expected), + session(session), + testName(testName) { + } + + bool await(long long timeout, const TimeUnit& unit) { + return doneLatch.await(timeout, unit); + } + + virtual void onMessage(const cms::Message* message) { + try { + doneLatch.countDown(); + if (session->isTransacted()) { + session->rollback(); + } + } catch (cms::CMSException& e) { + e.printStackTrace(); + } + + throw decaf::lang::exceptions::RuntimeException( + __FILE__, __LINE__, (testName + " forced a redelivery").c_str()); + } + }; + +} + +//////////////////////////////////////////////////////////////////////////////// +OpenWireMessageListenerRedeliveryTest::OpenWireMessageListenerRedeliveryTest() { +} + +//////////////////////////////////////////////////////////////////////////////// +OpenWireMessageListenerRedeliveryTest::~OpenWireMessageListenerRedeliveryTest() { +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenWireMessageListenerRedeliveryTest::testQueueRollbackConsumerListener() { + + std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL())); + connection->start(); + + std::auto_ptr<Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED)); + std::auto_ptr<Queue> queue(session->createQueue("testQueueRollbackConsumerListener")); + std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get())); + std::auto_ptr<Message> message(session->createTextMessage("Hello")); + + ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get()); + amqConnection->destroyDestination(queue.get()); + + producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT); + producer->send(message.get()); + session->commit(); + + std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get())); + + TestMessageListener listener(session.get()); + consumer->setMessageListener(&listener); + + TimeUnit::MILLISECONDS.sleep(500); + + // first try.. should get 2 since there is no delay on the first redelivery. + CPPUNIT_ASSERT_EQUAL(2, listener.getCounter()); + + TimeUnit::MILLISECONDS.sleep(1000); + + // 2nd redeliver (redelivery after 1 sec) + CPPUNIT_ASSERT_EQUAL(3, listener.getCounter()); + + TimeUnit::MILLISECONDS.sleep(2000); + + // 3rd redeliver (redelivery after 2 seconds) - it should give up after that + CPPUNIT_ASSERT_EQUAL(4, listener.getCounter()); + + // create new message + std::auto_ptr<Message> secondMessage(session->createTextMessage("Hello 2")); + producer->send(secondMessage.get()); + session->commit(); + + TimeUnit::MILLISECONDS.sleep(500); + + // it should be committed, so no redelivery + CPPUNIT_ASSERT_EQUAL(5, listener.getCounter()); + + TimeUnit::MILLISECONDS.sleep(1500); + + // no redelivery, counter should still be 4 + CPPUNIT_ASSERT_EQUAL(5, listener.getCounter()); + + connection->close(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenWireMessageListenerRedeliveryTest::testQueueSessionListenerExceptionRetry() { + + std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL())); + connection->start(); + + std::auto_ptr<Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE)); + std::auto_ptr<Queue> queue(session->createQueue("testQueueSessionListenerExceptionRetry")); + std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get())); + std::auto_ptr<Message> message1(session->createTextMessage("1")); + std::auto_ptr<Message> message2(session->createTextMessage("2")); + + ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get()); + amqConnection->destroyDestination(queue.get()); + + producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT); + producer->send(message1.get()); + producer->send(message2.get()); + + std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get())); + + int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries(); + + ExceptionMessageListener listener("testQueueSessionListenerExceptionRetry", 2, maxDeliveries); + consumer->setMessageListener(&listener); + + CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(30, TimeUnit::SECONDS)); + + for (int i = 0; i < maxDeliveries; i++) { + CPPUNIT_ASSERT_EQUAL_MESSAGE("got first redelivered: " + Integer::toString(i), + std::string("1"), listener.getReceived().get(i)); + } + + for (int i = maxDeliveries; i < maxDeliveries * 2; i++) { + CPPUNIT_ASSERT_EQUAL_MESSAGE("got first redelivered: " + Integer::toString(i), + std::string("2"), listener.getReceived().get(i)); + } + + connection->close(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenWireMessageListenerRedeliveryTest::testQueueSessionListenerExceptionDlq() { + + const std::string TEST_NAME = "testQueueSessionListenerExceptionDlq"; + + std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL())); + connection->start(); + + std::auto_ptr<Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE)); + std::auto_ptr<Queue> queue(session->createQueue(TEST_NAME)); + std::auto_ptr<Queue> dlq(session->createQueue("ActiveMQ.DLQ")); + + ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get()); + amqConnection->destroyDestination(queue.get()); + amqConnection->destroyDestination(dlq.get()); + + std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get())); + std::auto_ptr<Message> message(session->createTextMessage("1")); + producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT); + producer->send(message.get()); + + // Track messages going to DLQ + TrackingMessageListener dlqListener(TEST_NAME); + std::auto_ptr<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get())); + dlqConsumer->setMessageListener(&dlqListener); + + // Receive and throw + int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries(); + FailingMessageListener listener(session.get(), TEST_NAME, maxDeliveries); + std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get())); + consumer->setMessageListener(&listener); + + CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(20, TimeUnit::SECONDS)); + + // check DLQ + CPPUNIT_ASSERT_MESSAGE("got dlq message", dlqListener.await(30, TimeUnit::SECONDS)); + + // check DLQ message cause is captured + Pointer<cms::Message> dlqMessage = dlqListener.getReceived().get(0); + CPPUNIT_ASSERT_MESSAGE("dlq message captured", dlqMessage != NULL); + String cause = dlqMessage->getStringProperty(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + + CPPUNIT_ASSERT_MESSAGE("cause 'cause' exception is remembered", cause.contains("Exception")); + CPPUNIT_ASSERT_MESSAGE("is correct exception", cause.contains(TEST_NAME)); + CPPUNIT_ASSERT_MESSAGE("cause exception is remembered", cause.contains("JMSException")); + CPPUNIT_ASSERT_MESSAGE("cause policy is remembered", cause.contains("RedeliveryPolicy")); + + connection->close(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenWireMessageListenerRedeliveryTest::testTransactedQueueSessionListenerExceptionDlq() { + + const std::string TEST_NAME = "testTransactedQueueSessionListenerExceptionDlq"; + + std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL())); + connection->start(); + + std::auto_ptr<Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED)); + std::auto_ptr<Queue> queue(session->createQueue(TEST_NAME)); + std::auto_ptr<Queue> dlq(session->createQueue("ActiveMQ.DLQ")); + + ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get()); + amqConnection->destroyDestination(queue.get()); + amqConnection->destroyDestination(dlq.get()); + + std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get())); + std::auto_ptr<Message> message(session->createTextMessage("1")); + producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT); + producer->send(message.get()); + session->commit(); + + // Track messages going to DLQ + TrackingMessageListener dlqListener(TEST_NAME); + std::auto_ptr<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get())); + dlqConsumer->setMessageListener(&dlqListener); + + // Receive and throw + int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries(); + FailingMessageListener listener(session.get(), TEST_NAME, maxDeliveries); + std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get())); + consumer->setMessageListener(&listener); + + CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(20, TimeUnit::SECONDS)); + + // check DLQ + CPPUNIT_ASSERT_MESSAGE("got dlq message", dlqListener.await(30, TimeUnit::SECONDS)); + + // check DLQ message cause is captured + Pointer<cms::Message> dlqMessage = dlqListener.getReceived().get(0); + CPPUNIT_ASSERT_MESSAGE("dlq message captured", dlqMessage != NULL); + String cause = dlqMessage->getStringProperty(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + + CPPUNIT_ASSERT_MESSAGE("cause 'cause' exception is remembered", cause.contains("Exception")); + CPPUNIT_ASSERT_MESSAGE("is correct exception", cause.contains(TEST_NAME)); + CPPUNIT_ASSERT_MESSAGE("cause exception is remembered", cause.contains("JMSException")); + CPPUNIT_ASSERT_MESSAGE("cause policy is remembered", cause.contains("RedeliveryPolicy")); + + connection->close(); +} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/ce523a14/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h new file mode 100644 index 0000000..52aa83f --- /dev/null +++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenWireMessageListenerRedeliveryTest.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGELISTENERREDELIVERYTEST_H_ +#define ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGELISTENERREDELIVERYTEST_H_ + +#include <activemq/test/CMSTestFixture.h> +#include <activemq/util/IntegrationCommon.h> + +namespace activemq { +namespace test { +namespace openwire { + + class OpenWireMessageListenerRedeliveryTest : public CMSTestFixture { + private: + + CPPUNIT_TEST_SUITE( OpenWireMessageListenerRedeliveryTest ); + CPPUNIT_TEST( testQueueRollbackConsumerListener ); + CPPUNIT_TEST( testQueueSessionListenerExceptionRetry ); + CPPUNIT_TEST( testQueueSessionListenerExceptionDlq ); + CPPUNIT_TEST( testTransactedQueueSessionListenerExceptionDlq ); + CPPUNIT_TEST_SUITE_END(); + + public: + + OpenWireMessageListenerRedeliveryTest(); + virtual ~OpenWireMessageListenerRedeliveryTest(); + + virtual std::string getBrokerURL() const { + return activemq::util::IntegrationCommon::getInstance().getOpenwireURL(); + } + + void testQueueRollbackConsumerListener(); + void testQueueSessionListenerExceptionRetry(); + void testQueueSessionListenerExceptionDlq(); + void testTransactedQueueSessionListenerExceptionDlq(); + + }; + +}}} + +#endif /* ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGELISTENERREDELIVERYTEST_H_ */