It seems that email attachment is not allowed... I put the diff directly
below...
Daniel Laügt.
---
soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
2011/10/19 10:02:31 146083
+++
soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
2011/10/19 10:17:24 146084
@@ -54,9 +54,11 @@
// Either we need to implement something similar to
LinkedHashMap or find
// some other way of tracking the eldest entry into the map and
removing it
// if the cache size is exceeded.
- ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
+ ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
MessageId::COMPARATOR > messageCache;
+ ConcurrentStlMap< std::string, Pointer<Command> > messagePullCache;
+
bool trackTransactions;
bool restoreSessions;
bool restoreConsumers;
@@ -122,6 +124,8 @@
virtual Pointer<Command> processEndTransaction( TransactionInfo* info
);
+ virtual Pointer<Command> processMessagePull( MessagePull* pull );
+
bool isRestoreConsumers() const {
return this->restoreConsumers;
}
---
soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
2011/10/19 10:02:31 146083
+++
soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
2011/10/19 10:17:24 146084
@@ -108,11 +108,21 @@
void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
try{
- if( trackMessages && command != NULL && command->isMessage() ) {
- Pointer<Message> message =
+ if( command != NULL ) {
+ if( trackMessages && command->isMessage() ) {
+ Pointer<Message> message =
command.dynamicCast<Message>();
- if( message->getTransactionId() == NULL ) {
+ if( message->getTransactionId() == NULL ) {
currentCacheSize = currentCacheSize + message->getSize();
+ }
+ }
+ else {
+ Pointer<MessagePull> messagePull =
+ command.dynamicCast<MessagePull>();
+ if( messagePull != NULL ) {
+ // just needs to be a rough estimate of size, ~4 identifiers
+ currentCacheSize += 400;
+ }
}
}
}
@@ -148,12 +158,19 @@
}
// Now we flush messages
- std::vector< Pointer<Message> > messages = messageCache.values();
- std::vector< Pointer<Message> >::const_iterator messageIter =
messages.begin();
+ std::vector< Pointer<Command> > messages = messageCache.values();
+ std::vector< Pointer<Command> >::const_iterator messageIter =
messages.begin();
for( ; messageIter != messages.end(); ++messageIter ) {
transport->oneway( *messageIter );
}
+
+ std::vector< Pointer<Command> > messagePulls =
messagePullCache.values();
+ std::vector< Pointer<Command> >::const_iterator messagePullIter =
messagePulls.begin();
+
+ for( ; messagePullIter != messagePulls.end(); ++messagePullIter ) {
+ transport->oneway( *messagePullIter );
+ }
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -790,6 +807,19 @@
}
////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull* pull
) {
+ if( pull != NULL
+ && pull->getDestination() != NULL
+ && pull->getConsumerId() != NULL) {
+ std::string id = pull->getDestination()->toString() + "::" +
pull->getConsumerId()->toString();
+ messagePullCache.put( id,
+ Pointer<Command>( pull->cloneDataStructure() ) );
+ }
+
+ return Pointer<Command>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ConnectionStateTracker::connectionInterruptProcessingComplete(
transport::Transport* transport, const Pointer<ConnectionId>& connectionId
) {
From: Daniel Laugt [mailto:[email protected]]
Sent: 08 November 2011 13:16
To: [email protected]
Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if
the MessagePull command is lost
Hello,
I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
prefetch size = 0, polling consumer fails to reconnect during the failover.
This issue has been fixed by the item AMQ-2877:
https://issues.apache.org/jira/browse/AMQ-2877
AMQ-2877 fixes the problem in the java client side but not in the c++ client
side. Is it possible to merge this fix to ActiveMQ-CPP?
Attached on this email a diff of what I've merged from AMQ-2877 to resolve the
problem on my ActiveMQ-CPP. This diff can be used probably as a suggestion...
Regards,
Daniel Laügt.