Author: tabish
Date: Thu Oct 21 21:48:12 2010
New Revision: 1026151
URL: http://svn.apache.org/viewvc?rev=1026151&view=rev
Log:
Refactor the shutdown methods in Consumer, Producer and Session to streamline
closing out resources when the parent is closed before the children, avoids
unneeded message sends and fixes a couple issues.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Thu Oct 21 21:48:12 2010
@@ -1093,3 +1093,8 @@ bool ActiveMQConnection::isMessagePriori
void ActiveMQConnection::setMessagePrioritySupported( bool value ) {
this->config->messagePrioritySupported = value;
}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Exception* ActiveMQConnection::getFirstFailureError() const {
+ return this->config->firstFailureError.get();
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
Thu Oct 21 21:48:12 2010
@@ -644,6 +644,13 @@ namespace core{
*/
void setTransportInterruptionProcessingComplete();
+ /**
+ * Gets the pointer to the first exception that caused the Connection
to become failed.
+ *
+ * @returns pointer to and Exception instance or NULL if none is set.
+ */
+ decaf::lang::Exception* getFirstFailureError() const;
+
private:
// Sends a oneway disconnect message to the broker.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
Thu Oct 21 21:48:12 2010
@@ -366,6 +366,24 @@ void ActiveMQConsumer::doClose() {
try {
+ dispose();
+ // Remove at the Broker Side, consumer has been removed from the local
+ // Session and Connection objects so if the remote call to remove
throws
+ // it is okay to propagate to the client.
+ Pointer<RemoveInfo> info( new RemoveInfo );
+ info->setObjectId( this->consumerInfo->getConsumerId() );
+ info->setLastDeliveredSequenceId(
this->internal->lastDeliveredSequenceId );
+ this->session->oneway( info );
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::dispose() {
+
+ try{
if( !this->isClosed() ) {
if( !session->isTransacted() ) {
@@ -380,7 +398,7 @@ void ActiveMQConsumer::doClose() {
// Purge all the pending messages
try{
- internal->unconsumedMessages->clear();
+ this->internal->unconsumedMessages->clear();
} catch ( ActiveMQException& ex ){
if( !haveException ){
ex.setMark( __FILE__, __LINE__ );
@@ -390,7 +408,7 @@ void ActiveMQConsumer::doClose() {
}
// Stop and Wakeup all sync consumers.
- internal->unconsumedMessages->close();
+ this->internal->unconsumedMessages->close();
if( this->session->isIndividualAcknowledge() ) {
// For IndividualAck Mode we need to unlink the ack handler to
remove a
@@ -406,15 +424,7 @@ void ActiveMQConsumer::doClose() {
}
// Remove this Consumer from the Connections set of Dispatchers
- this->session->removeConsumer(
this->consumerInfo->getConsumerId(), this->internal->lastDeliveredSequenceId );
-
- // Remove at the Broker Side, consumer has been removed from the
local
- // Session and Connection objects so if the remote call to remove
throws
- // it is okay to propagate to the client.
- Pointer<RemoveInfo> info( new RemoveInfo );
- info->setObjectId( this->consumerInfo->getConsumerId() );
- info->setLastDeliveredSequenceId(
this->internal->lastDeliveredSequenceId );
- this->session->oneway( info );
+ this->session->removeConsumer( this->consumerInfo->getConsumerId()
);
// If we encountered an error, propagate it.
if( haveException ){
@@ -650,8 +660,8 @@ void ActiveMQConsumer::beforeMessageIsCo
if( !isAutoAcknowledgeBatch() ) {
// When not in an Auto
- synchronized( &internal->dispatchedMessages ) {
- internal->dispatchedMessages.enqueueFront( dispatch );
+ synchronized( &this->internal->dispatchedMessages ) {
+ this->internal->dispatchedMessages.enqueueFront( dispatch );
}
if( this->session->isTransacted() ) {
@@ -666,7 +676,7 @@ void ActiveMQConsumer::afterMessageIsCon
try{
- if( internal->unconsumedMessages->isClosed() ) {
+ if( this->internal->unconsumedMessages->isClosed() ) {
return;
}
@@ -680,13 +690,13 @@ void ActiveMQConsumer::afterMessageIsCon
if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
- synchronized( &internal->dispatchedMessages ) {
- if( !internal->dispatchedMessages.empty() ) {
+ synchronized( &this->internal->dispatchedMessages ) {
+ if( !this->internal->dispatchedMessages.empty() ) {
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages(
ActiveMQConstants::ACK_TYPE_CONSUMED );
if( ack != NULL ) {
- internal->dispatchedMessages.clear();
+ this->internal->dispatchedMessages.clear();
session->oneway( ack );
}
}
@@ -702,7 +712,7 @@ void ActiveMQConsumer::afterMessageIsCon
bool messageUnackedByConsumer = false;
- synchronized( &internal->dispatchedMessages ) {
+ synchronized( &this->internal->dispatchedMessages ) {
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
if( iter->next() == message ) {
@@ -736,21 +746,21 @@ void ActiveMQConsumer::deliverAcks() {
if( isAutoAcknowledgeEach() ) {
- synchronized( &internal->dispatchedMessages ) {
+ synchronized( &this->internal->dispatchedMessages ) {
ack = makeAckForAllDeliveredMessages(
ActiveMQConstants::ACK_TYPE_CONSUMED );
if( ack != NULL ) {
- internal->dispatchedMessages.clear();
+ this->internal->dispatchedMessages.clear();
} else {
ack.swap( internal->pendingAck );
}
}
- } else if( internal->pendingAck != NULL &&
- internal->pendingAck->getAckType() ==
ActiveMQConstants::ACK_TYPE_CONSUMED ) {
+ } else if( this->internal->pendingAck != NULL &&
+ this->internal->pendingAck->getAckType() ==
ActiveMQConstants::ACK_TYPE_CONSUMED ) {
- ack.swap( internal->pendingAck );
+ ack.swap( this->internal->pendingAck );
}
if( ack != NULL ) {
@@ -776,8 +786,8 @@ void ActiveMQConsumer::ackLater( const P
// consumer got the message to expand the pre-fetch window
if( session->isTransacted() ) {
session->doStartTransaction();
- if( !internal->synchronizationRegistered ) {
- internal->synchronizationRegistered = true;
+ if( !this->internal->synchronizationRegistered ) {
+ this->internal->synchronizationRegistered = true;
Pointer<Synchronization> sync( new TransactionSynhcronization(
this ) );
this->session->getTransactionContext()->addSynchronization( sync );
@@ -786,20 +796,20 @@ void ActiveMQConsumer::ackLater( const P
// The delivered message list is only needed for the recover method
// which is only used with client ack.
- internal->deliveredCounter++;
+ this->internal->deliveredCounter++;
- Pointer<MessageAck> oldPendingAck = internal->pendingAck;
- internal->pendingAck.reset( new MessageAck() );
- internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
- internal->pendingAck->setAckType( (unsigned char)ackType );
- internal->pendingAck->setDestination( dispatch->getDestination() );
- internal->pendingAck->setLastMessageId(
dispatch->getMessage()->getMessageId() );
- internal->pendingAck->setMessageCount( internal->deliveredCounter );
+ Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
+ this->internal->pendingAck.reset( new MessageAck() );
+ this->internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
+ this->internal->pendingAck->setAckType( (unsigned char)ackType );
+ this->internal->pendingAck->setDestination( dispatch->getDestination() );
+ this->internal->pendingAck->setLastMessageId(
dispatch->getMessage()->getMessageId() );
+ this->internal->pendingAck->setMessageCount( internal->deliveredCounter );
if( oldPendingAck == NULL ) {
- internal->pendingAck->setFirstMessageId(
internal->pendingAck->getLastMessageId() );
- } else if ( oldPendingAck->getAckType() ==
internal->pendingAck->getAckType() ) {
- internal->pendingAck->setFirstMessageId(
oldPendingAck->getFirstMessageId() );
+ this->internal->pendingAck->setFirstMessageId(
this->internal->pendingAck->getLastMessageId() );
+ } else if ( oldPendingAck->getAckType() ==
this->internal->pendingAck->getAckType() ) {
+ this->internal->pendingAck->setFirstMessageId(
oldPendingAck->getFirstMessageId() );
} else {
// old pending ack being superseded by ack of another type, if is is
not a delivered
// ack and hence important, send it now so it is not lost.
@@ -809,33 +819,33 @@ void ActiveMQConsumer::ackLater( const P
}
if( session->isTransacted() ) {
- internal->pendingAck->setTransactionId(
this->session->getTransactionContext()->getTransactionId() );
+ this->internal->pendingAck->setTransactionId(
this->session->getTransactionContext()->getTransactionId() );
}
if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= (
internal->deliveredCounter - internal->additionalWindowSize ) ) {
session->oneway( this->internal->pendingAck );
- internal->pendingAck.reset( NULL );
- internal->deliveredCounter = 0;
- internal->additionalWindowSize = 0;
+ this->internal->pendingAck.reset( NULL );
+ this->internal->deliveredCounter = 0;
+ this->internal->additionalWindowSize = 0;
}
}
////////////////////////////////////////////////////////////////////////////////
Pointer<MessageAck> ActiveMQConsumer::makeAckForAllDeliveredMessages( int type
) {
- synchronized( &internal->dispatchedMessages ) {
+ synchronized( &this->internal->dispatchedMessages ) {
- if( !internal->dispatchedMessages.empty() ) {
+ if( !this->internal->dispatchedMessages.empty() ) {
- Pointer<MessageDispatch> dispatched =
internal->dispatchedMessages.front();
+ Pointer<MessageDispatch> dispatched =
this->internal->dispatchedMessages.front();
Pointer<MessageAck> ack( new MessageAck() );
ack->setAckType( (unsigned char)type );
ack->setConsumerId( dispatched->getConsumerId() );
ack->setDestination( dispatched->getDestination() );
- ack->setMessageCount( (int)internal->dispatchedMessages.size() );
+ ack->setMessageCount(
(int)this->internal->dispatchedMessages.size() );
ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
- ack->setFirstMessageId(
internal->dispatchedMessages.back()->getMessage()->getMessageId() );
+ ack->setFirstMessageId(
this->internal->dispatchedMessages.back()->getMessage()->getMessageId() );
return ack;
}
@@ -863,7 +873,7 @@ void ActiveMQConsumer::acknowledge( cons
session->oneway( ack );
- synchronized( &internal->dispatchedMessages ) {
+ synchronized( &this->internal->dispatchedMessages ) {
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
if( iter->next() == dispatch ) {
@@ -887,7 +897,7 @@ void ActiveMQConsumer::acknowledge() {
try{
- synchronized( &internal->dispatchedMessages ) {
+ synchronized( &this->internal->dispatchedMessages ) {
// Acknowledge all messages so far.
Pointer<MessageAck> ack =
@@ -907,9 +917,9 @@ void ActiveMQConsumer::acknowledge() {
// Adjust the counters
this->internal->deliveredCounter =
- Math::max( 0, internal->deliveredCounter -
(int)internal->dispatchedMessages.size());
+ Math::max( 0, this->internal->deliveredCounter -
(int)this->internal->dispatchedMessages.size());
this->internal->additionalWindowSize =
- Math::max(0, internal->additionalWindowSize -
(int)internal->dispatchedMessages.size());
+ Math::max(0, this->internal->additionalWindowSize -
(int)this->internal->dispatchedMessages.size());
if( !session->isTransacted() ) {
this->internal->dispatchedMessages.clear();
@@ -931,24 +941,24 @@ void ActiveMQConsumer::commit() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::rollback() {
- synchronized( internal->unconsumedMessages.get() ) {
+ synchronized( this->internal->unconsumedMessages.get() ) {
- synchronized( &internal->dispatchedMessages ) {
- if( internal->dispatchedMessages.empty() ) {
+ synchronized( &this->internal->dispatchedMessages ) {
+ if( this->internal->dispatchedMessages.empty() ) {
return;
}
// Only increase the redelivery delay after the first redelivery..
- Pointer<MessageDispatch> lastMsg =
internal->dispatchedMessages.front();
+ Pointer<MessageDispatch> lastMsg =
this->internal->dispatchedMessages.front();
const int currentRedeliveryCount =
lastMsg->getMessage()->getRedeliveryCounter();
if( currentRedeliveryCount > 0 ) {
- internal->redeliveryDelay =
this->internal->redeliveryPolicy->getNextRedeliveryDelay(
internal->redeliveryDelay );
+ this->internal->redeliveryDelay =
this->internal->redeliveryPolicy->getNextRedeliveryDelay(
internal->redeliveryDelay );
} else {
- internal->redeliveryDelay =
this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
+ this->internal->redeliveryDelay =
this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
}
Pointer<MessageId> firstMsgId =
-
internal->dispatchedMessages.back()->getMessage()->getMessageId();
+
this->internal->dispatchedMessages.back()->getMessage()->getMessageId();
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
@@ -966,15 +976,15 @@ void ActiveMQConsumer::rollback() {
ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
ack->setConsumerId( this->consumerInfo->getConsumerId() );
ack->setDestination( lastMsg->getDestination() );
- ack->setMessageCount( (int)internal->dispatchedMessages.size()
);
+ ack->setMessageCount(
(int)this->internal->dispatchedMessages.size() );
ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
ack->setFirstMessageId( firstMsgId );
session->oneway( ack );
// Adjust the window size.
- internal->additionalWindowSize =
- Math::max( 0, internal->additionalWindowSize -
(int)internal->dispatchedMessages.size() );
- internal->redeliveryDelay = 0;
+ this->internal->additionalWindowSize =
+ Math::max( 0, this->internal->additionalWindowSize -
(int)this->internal->dispatchedMessages.size() );
+ this->internal->redeliveryDelay = 0;
} else {
@@ -984,7 +994,7 @@ void ActiveMQConsumer::rollback() {
ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
ack->setConsumerId( this->consumerInfo->getConsumerId() );
ack->setDestination( lastMsg->getDestination() );
- ack->setMessageCount(
(int)internal->dispatchedMessages.size() );
+ ack->setMessageCount(
(int)this->internal->dispatchedMessages.size() );
ack->setLastMessageId(
lastMsg->getMessage()->getMessageId() );
ack->setFirstMessageId( firstMsgId );
@@ -992,15 +1002,15 @@ void ActiveMQConsumer::rollback() {
}
// stop the delivery of messages.
- internal->unconsumedMessages->stop();
+ this->internal->unconsumedMessages->stop();
- std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
- internal->unconsumedMessages->enqueueFirst( iter->next() );
+ this->internal->unconsumedMessages->enqueueFirst(
iter->next() );
}
- if( internal->redeliveryDelay > 0 &&
!internal->unconsumedMessages->isClosed() ) {
+ if( internal->redeliveryDelay > 0 &&
!this->internal->unconsumedMessages->isClosed() ) {
// TODO
// Start up the delivery again a little later.
//scheduler.executeAfterDelay(new Runnable() {
@@ -1020,13 +1030,13 @@ void ActiveMQConsumer::rollback() {
}
}
- internal->deliveredCounter -=
(int)internal->dispatchedMessages.size();
- internal->dispatchedMessages.clear();
+ this->internal->deliveredCounter -=
(int)internal->dispatchedMessages.size();
+ this->internal->dispatchedMessages.clear();
}
}
if( this->internal->listener != NULL ) {
- session->redispatch( *internal->unconsumedMessages );
+ session->redispatch( *this->internal->unconsumedMessages );
}
}
@@ -1035,17 +1045,17 @@ void ActiveMQConsumer::dispatch( const P
try {
- synchronized( internal->unconsumedMessages.get() ) {
+ synchronized( this->internal->unconsumedMessages.get() ) {
clearMessagesInProgress();
if( this->internal->clearDispatchList ) {
// we are reconnecting so lets flush the in progress
// messages
- internal->clearDispatchList = false;
- internal->unconsumedMessages->clear();
+ this->internal->clearDispatchList = false;
+ this->internal->unconsumedMessages->clear();
}
- if( !internal->unconsumedMessages->isClosed() ) {
+ if( !this->internal->unconsumedMessages->isClosed() ) {
// Don't dispatch expired messages, ack it and then destroy it
if( dispatch->getMessage()->isExpired() ) {
@@ -1055,7 +1065,7 @@ void ActiveMQConsumer::dispatch( const P
return;
}
- synchronized( &internal->listenerMutex ) {
+ synchronized( &this->internal->listenerMutex ) {
// If we have a listener, send the message.
if( this->internal->listener != NULL &&
internal->unconsumedMessages->isRunning() ) {
@@ -1122,7 +1132,7 @@ void ActiveMQConsumer::checkClosed() con
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::iterate() {
- synchronized( &internal->listenerMutex ) {
+ synchronized( &this->internal->listenerMutex ) {
if( this->internal->listener != NULL ) {
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
Thu Oct 21 21:48:12 2010
@@ -146,6 +146,13 @@ namespace core{
void doClose();
/**
+ * Cleans up this objects internal resources.
+ *
+ * @throw ActiveMQException if an error occurs while performing the
operation.
+ */
+ void dispose();
+
+ /**
* Get the Consumer information for this consumer
* @return Reference to a Consumer Info Object
*/
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
Thu Oct 21 21:48:12 2010
@@ -98,8 +98,7 @@ void ActiveMQProducer::close() {
if( !this->isClosed() ) {
- this->session->removeProducer( this->producerInfo->getProducerId()
);
- this->closed = true;
+ dispose();
// Remove at the Broker Side, if this fails the producer has
already
// been removed from the session and connection objects so its safe
@@ -113,6 +112,15 @@ void ActiveMQProducer::close() {
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducer::dispose() {
+
+ if( !this->isClosed() ) {
+ this->session->removeProducer( this->producerInfo->getProducerId() );
+ this->closed = true;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQProducer::send( cms::Message* message ) {
try {
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
Thu Oct 21 21:48:12 2010
@@ -242,6 +242,14 @@ namespace core{
*/
virtual void onProducerAck( const commands::ProducerAck& ack );
+ /**
+ * Performs Producer object cleanup but doesn't attempt to send the
Remove command
+ * to the broker. Called when the parent resource if closed first to
avoid the message
+ * send and avoid any exceptions that might be thrown from an attempt
to send a remove
+ * command to a failed transport.
+ */
+ void dispose();
+
private:
// Checks for the closed state and throws if so.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
Thu Oct 21 21:48:12 2010
@@ -119,6 +119,53 @@ void ActiveMQSession::close() {
}
try {
+ doClose();
+ }
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::doClose() {
+
+ try {
+ dispose();
+
+ // Remove this session from the Broker.
+ Pointer<RemoveInfo> info( new RemoveInfo() );
+ info->setObjectId( this->sessionInfo->getSessionId() );
+ info->setLastDeliveredSequenceId( this->lastDeliveredSequenceId );
+ this->connection->oneway( info );
+ }
+ AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception,
activemq::exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::dispose() {
+
+ class Finalizer {
+ private:
+
+ ActiveMQSession* session;
+ ActiveMQConnection* connection;
+
+ public:
+
+ Finalizer(ActiveMQSession* session, ActiveMQConnection* connection) {
+ this->session = session;
+ this->connection = connection;
+ }
+
+ ~Finalizer() {
+ this->connection->removeSession(this->session);
+ this->session->closed = true;
+ }
+ };
+
+ try{
+
+ Finalizer final(this, this->connection);
// Stop the dispatch executor.
stop();
@@ -129,47 +176,40 @@ void ActiveMQSession::close() {
this->transaction->rollback();
}
- // Close all Consumers
+ // Dispose of all Consumers, the dispose method skips the RemoveInfo
command.
synchronized( &this->consumers ) {
std::vector<ActiveMQConsumer*> closables =
this->consumers.values();
for( std::size_t i = 0; i < closables.size(); ++i ) {
try{
- closables[i]->close();
+
closables[i]->setFailureError(this->connection->getFirstFailureError());
+ closables[i]->dispose();
+ this->lastDeliveredSequenceId =
+ Math::max( this->lastDeliveredSequenceId,
closables[i]->getLastDeliveredSequenceId() );
} catch( cms::CMSException& ex ){
/* Absorb */
}
}
}
- // Close all Producers
+ // Dispose of all Producers, the dispose method skips the RemoveInfo
command.
synchronized( &this->producers ) {
std::vector<ActiveMQProducer*> closables =
this->producers.values();
for( std::size_t i = 0; i < closables.size(); ++i ) {
try{
- closables[i]->close();
+ closables[i]->dispose();
} catch( cms::CMSException& ex ){
/* Absorb */
}
}
}
-
- // Now indicate that this session is closed.
- closed = true;
-
- // Remove this sessions from the connection
- this->connection->removeSession( this );
-
- // Remove this session from the Broker.
- Pointer<RemoveInfo> info( new RemoveInfo() );
- info->setObjectId( this->sessionInfo->getSessionId() );
- info->setLastDeliveredSequenceId( this->lastDeliveredSequenceId );
- this->connection->oneway( info );
}
- AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+ AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception,
activemq::exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
@@ -919,7 +959,7 @@ void ActiveMQSession::addConsumer( Activ
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::removeConsumer( const Pointer<ConsumerId>& consumerId,
long long lastDeliveredSequenceId ) {
+void ActiveMQSession::removeConsumer( const Pointer<ConsumerId>& consumerId ) {
try{
@@ -933,8 +973,6 @@ void ActiveMQSession::removeConsumer( co
// the Connection.
this->connection->removeDispatcher( consumerId );
this->consumers.remove( consumerId );
- this->lastDeliveredSequenceId =
- Math::max( this->lastDeliveredSequenceId,
lastDeliveredSequenceId );
}
}
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
Thu Oct 21 21:48:12 2010
@@ -364,12 +364,10 @@ namespace core{
*
* @param consumerId
* The ConsumerId of the MessageConsumer to remove from this
Session.
- * @param lastDeliveredSequenceId
- * The sequenceId of the last Message the consumer delivered.
*
* @throw ActiveMQException if an internal error occurs.
*/
- void removeConsumer( const Pointer<commands::ConsumerId>& consumerId,
long long lastDeliveredSequenceId = 0 );
+ void removeConsumer( const Pointer<commands::ConsumerId>& consumerId );
/**
* Adds a MessageProducer to this session registering it with the
Connection and store
@@ -447,6 +445,22 @@ namespace core{
*/
Pointer<commands::ProducerId> getNextProducerId();
+ /**
+ * Performs the actual Session close operations. This method is meant
for use
+ * by ActiveMQConnection, the connection object calls this when it has
been
+ * closed to skip some of the extraneous processing done by the client
level
+ * close method.
+ */
+ void doClose();
+
+ /**
+ * Cleans up the Session object's resources without attempting to send
the
+ * Remove command to the broker, this can be called from
ActiveMQConnection when
+ * it knows that the transport is down and the doClose method would
throw an
+ * exception when it attempt to send the Remove Command.
+ */
+ void dispose();
+
private:
/**