Author: nmittler
Date: Sat Feb 9 14:47:32 2008
New Revision: 620209
URL: http://svn.apache.org/viewvc?rev=620209&view=rev
Log:
AMQCPP-152 - Adding synchronous receive to CmsTemplate
Added:
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h
Added: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h?rev=620209&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h
(added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h Sat
Feb 9 14:47:32 2008
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
+#define ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
+
+#include <cms/MessageConsumer.h>
+
+namespace activemq {
+namespace cmsutil {
+
+ /**
+ * A cached message consumer contained within a pooled session.
+ */
+ class CachedConsumer : public cms::MessageConsumer {
+ private:
+
+ cms::MessageConsumer* consumer;
+
+ public:
+
+ CachedConsumer( cms::MessageConsumer* consumer ) {
+ this->consumer = consumer;
+ }
+
+ virtual ~CachedConsumer() {}
+
+ /**
+ * Does nothing - the real producer resource will be closed
+ * by the lifecycle manager.
+ */
+ virtual void close() throw( cms::CMSException ){
+ // Do nothing.
+ }
+
+ virtual cms::Message* receive() throw ( cms::CMSException ) {
+ return consumer->receive();
+ }
+
+ virtual cms::Message* receive( int millisecs ) throw (
cms::CMSException ) {
+ return consumer->receive(millisecs);
+ }
+
+ virtual cms::Message* receiveNoWait() throw ( cms::CMSException ) {
+ return consumer->receiveNoWait();
+ }
+
+ virtual void setMessageListener( cms::MessageListener* listener ) {
+ consumer->setMessageListener(listener);
+ }
+
+ virtual cms::MessageListener* getMessageListener() const {
+ return consumer->getMessageListener();
+ }
+
+ virtual std::string getMessageSelector() const
+ throw ( cms::CMSException ) {
+ return consumer->getMessageSelector();
+ }
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_*/
Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h Sat
Feb 9 14:47:32 2008
@@ -24,7 +24,7 @@
namespace cmsutil {
/**
- * A cached message roducer contained within a pooled session.
+ * A cached message producer contained within a pooled session.
*/
class CachedProducer : public cms::MessageProducer {
private:
Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp Sat
Feb 9 14:47:32 2008
@@ -242,6 +242,37 @@
}
////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* CmsTemplate::createConsumer(cms::Session* session,
+ cms::Destination* dest,
+ const std::string& selector,
+ bool noLocal ) throw (cms::CMSException) {
+
+ try {
+
+ // If no destination was provided, resolve the default.
+ if( dest == NULL ) {
+ dest = resolveDefaultDestination(session);
+ }
+
+ cms::MessageConsumer* consumer = NULL;
+
+ // Try to use a cached consumer - requires that we're using a
+ // PooledSession
+ PooledSession* pooledSession = dynamic_cast<PooledSession*>(session);
+ if( pooledSession != NULL ) {
+ consumer = pooledSession->createCachedConsumer(dest, selector,
noLocal);
+ } else {
+ consumer = session->createConsumer(dest, selector, noLocal);
+ }
+
+ return consumer;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( IllegalStateException, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroyProducer( cms::MessageProducer*& producer)
throw (cms::CMSException) {
@@ -266,21 +297,6 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* CmsTemplate::createConsumer(cms::Session* session,
- cms::Destination* dest, const std::string& messageSelector)
- throw (cms::CMSException) {
-
- try {
- cms::MessageConsumer* consumer = session->createConsumer(dest,
- messageSelector,
- isNoLocal());
-
- return consumer;
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
void CmsTemplate::destroyConsumer( cms::MessageConsumer*& consumer)
throw (cms::CMSException) {
@@ -290,12 +306,17 @@
try {
- // Close the consumer, then destroy it.
+ // Close the producer, then destroy it.
consumer->close();
}
AMQ_CATCH_NOTHROW( cms::CMSException )
- delete consumer;
+ // Destroy if it's not a cached consumer.
+ CachedConsumer* cachedConsumer = dynamic_cast<CachedConsumer*>(consumer);
+ if( cachedConsumer == NULL ) {
+ delete consumer;
+ }
+
consumer = NULL;
}
@@ -437,7 +458,7 @@
throw (cms::CMSException, IllegalStateException) {
try {
- SenderExecutor senderExecutor(messageCreator, this);
+ SendExecutor senderExecutor(messageCreator, this);
execute(&senderExecutor);
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -451,7 +472,7 @@
throw (cms::CMSException, IllegalStateException) {
try {
- SenderExecutor senderExecutor(messageCreator, this);
+ SendExecutor senderExecutor(messageCreator, this);
execute(dest, &senderExecutor);
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -464,7 +485,7 @@
throw (cms::CMSException, IllegalStateException) {
try {
- SenderExecutor senderExecutor(messageCreator, this);
+ SendExecutor senderExecutor(messageCreator, this);
execute(destinationName, &senderExecutor);
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -507,4 +528,84 @@
}
}
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* CmsTemplate::doReceive(cms::MessageConsumer* consumer,
+ long long receiveTime )
+throw (cms::CMSException) {
+
+ cms::Message* message = NULL;
+
+ try {
+
+ if( consumer == NULL ) {
+ throw new ActiveMQException(__FILE__, __LINE__, "consumer is
NULL");
+ }
+
+ switch( receiveTime ) {
+ case RECEIVE_TIMEOUT_NO_WAIT: {
+ message = consumer->receiveNoWait();
+ break;
+ }
+ case RECEIVE_TIMEOUT_INDEFINITE_WAIT: {
+ message = consumer->receive();
+ break;
+ }
+ default: {
+ message = consumer->receive(receiveTime);
+ break;
+ }
+ }
+
+ } catch( ActiveMQException& e) {
+
+ e.setMark(__FILE__, __LINE__ );
+
+ // Destroy the message resource.
+ destroyMessage(message);
+
+ throw e;
+ }
+}
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::ReceiveExecutor::doInCms(cms::Session* session)
+ throw (cms::CMSException) {
+
+ cms::MessageConsumer* consumer = NULL;
+
+ try {
+
+ // Create the consumer resource.
+ consumer = parent->createConsumer(session, getDestination(session),
selector, noLocal);
+
+ // Receive the message.
+ message = parent->doReceive(consumer, receiveTime);
+
+ // Destroy the consumer resource.
+ parent->destroyConsumer(consumer);
+
+ } catch( ActiveMQException& e) {
+
+ e.setMark(__FILE__, __LINE__ );
+
+ // Destroy the message resource.
+ parent->destroyMessage(message);
+
+ // Destroy the consumer resource.
+ parent->destroyConsumer(consumer);
+
+ throw e;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Destination* CmsTemplate::ResolveReceiveExecutor::getDestination(
+ cms::Session* session )
+ throw (cms::CMSException) {
+
+ try {
+ return parent->resolveDestinationName(session, destinationName);
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h Sat Feb
9 14:47:32 2008
@@ -145,9 +145,9 @@
/**
* Session callback that sends to the given destination.
*/
- class SenderExecutor;
- friend class SenderExecutor;
- class SenderExecutor : public ProducerCallback {
+ class SendExecutor;
+ friend class SendExecutor;
+ class SendExecutor : public ProducerCallback {
private:
MessageCreator* messageCreator;
@@ -155,13 +155,13 @@
public:
- SenderExecutor( MessageCreator* messageCreator,
+ SendExecutor( MessageCreator* messageCreator,
CmsTemplate* parent) {
this->messageCreator = messageCreator;
this->parent = parent;
}
- virtual ~SenderExecutor() {}
+ virtual ~SendExecutor() {}
virtual void doInCms(cms::Session* session,
cms::MessageProducer* producer) throw (cms::CMSException) {
@@ -169,6 +169,75 @@
}
};
+ /**
+ * Session callback that receives from the given destination.
+ */
+ class ReceiveExecutor;
+ friend class ReceiveExecutor;
+ class ReceiveExecutor : public SessionCallback {
+ protected:
+
+ cms::Destination* destination;
+ std::string selector;
+ bool noLocal;
+ cms::Message* message;
+ CmsTemplate* parent;
+ long long receiveTime;
+
+ public:
+ ReceiveExecutor( CmsTemplate* parent,
+ cms::Destination* destination,
+ const std::string& selector,
+ bool noLocal,
+ long long receiveTime) {
+ this->parent = parent;
+ this->destination = destination;
+ this->selector = selector;
+ this->noLocal = noLocal;
+ this->receiveTime = receiveTime;
+ this->message = NULL;
+ }
+
+ virtual ~ReceiveExecutor() {}
+
+ virtual void doInCms(cms::Session* session)
+ throw (cms::CMSException);
+
+ virtual cms::Destination* getDestination(cms::Session* session
AMQCPP_UNUSED)
+ throw (cms::CMSException) {
+ return destination;
+ }
+ };
+
+ /**
+ * Session callback that executes a receive callback for a named
destination.
+ */
+ class ResolveReceiveExecutor;
+ friend class ResolveReceiveExecutor;
+ class ResolveReceiveExecutor : public ReceiveExecutor {
+ private:
+
+ std::string destinationName;
+
+ public:
+
+ ResolveReceiveExecutor(CmsTemplate* parent,
+ const std::string& selector,
+ bool noLocal,
+ long long receiveTime,
+ const std::string& destinationName)
+ :
+ ReceiveExecutor( parent, NULL, selector, noLocal, receiveTime)
{
+
+ this->destinationName = destinationName;
+ }
+
+ virtual ~ResolveReceiveExecutor() {}
+
+ virtual cms::Destination* getDestination(cms::Session* session)
+ throw (cms::CMSException);
+ };
+
private:
static const int NUM_SESSION_POOLS =
(int)cms::Session::SESSION_TRANSACTED + 1;
@@ -566,22 +635,22 @@
* @throws cms::CMSException thrown if the CMS methods throw.
*/
void destroyProducer( cms::MessageProducer*& producer ) throw
(cms::CMSException);
-
+
/**
* Allocates a consumer initialized with the proper values.
*
* @param session
* The session from which to create a consumer
* @param dest
- * The destination for which to create the consumer
- * @param messageSelector
- * The message selector for the consumer.
- * @return the new consumer
- * @throws cms::CMSException if the CMS methods throw.
+ * The destination for which to create the consumer. If
+ * this is NULL, the default will be used.
+ * @return the consumer
+ * @throws cms::CMSException thrown by the CMS API
*/
cms::MessageConsumer* createConsumer(cms::Session* session,
- cms::Destination* dest, const std::string& messageSelector)
- throw (cms::CMSException);
+ cms::Destination* dest,
+ const std::string& selector,
+ bool noLocal ) throw (cms::CMSException);
/**
* Closes and destroys a consumer resource
@@ -611,6 +680,18 @@
void doSend(cms::Session* session,
cms::MessageProducer* producer,
MessageCreator* messageCreator) throw (cms::CMSException);
+
+ /**
+ * Receives a message from a destination.
+ * @param consumer
+ * the consumer to receive from
+ * @param receiveTime
+ * the time to wait for the receive.
+ * @return the message that was read
+ * @throws cms::CMSException thrown if the CMS API throws.
+ */
+ cms::Message* doReceive(cms::MessageConsumer* consumer,
+ long long receiveTime ) throw (cms::CMSException);
/**
* Resolves the default destination and returns it.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp Sat
Feb 9 14:47:32 2008
@@ -33,11 +33,19 @@
////////////////////////////////////////////////////////////////////////////////
PooledSession::~PooledSession(){
+ // Destroy cached producers.
std::vector<CachedProducer*> cachedProducers = producerCache.getValues();
for( std::size_t ix = 0; ix < cachedProducers.size(); ++ix ) {
delete cachedProducers[ix];
}
cachedProducers.clear();
+
+ // Destroy cached consumers.
+ std::vector<CachedConsumer*> cachedConsumers = consumerCache.getValues();
+ for( std::size_t ix = 0; ix < cachedConsumers.size(); ++ix ) {
+ delete cachedConsumers[ix];
+ }
+ cachedConsumers.clear();
}
////////////////////////////////////////////////////////////////////////////////
@@ -59,12 +67,12 @@
throw ActiveMQException(__FILE__, __LINE__, "destination is NULL");
}
- std::string destName = getUniqueDestName(destination);
+ std::string key = getUniqueDestName(destination);
// Check the cache - add it if necessary.
CachedProducer* cachedProducer = NULL;
try {
- cachedProducer = producerCache.getValue(destName);
+ cachedProducer = producerCache.getValue(key);
} catch( decaf::lang::exceptions::NoSuchElementException& e ) {
// No producer exists for this destination - start by creating
@@ -78,7 +86,7 @@
cachedProducer = new CachedProducer(p);
// Add it to the cache.
- producerCache.setValue(destName, cachedProducer);
+ producerCache.setValue(key, cachedProducer);
}
return cachedProducer;
@@ -88,18 +96,65 @@
}
////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* PooledSession::createCachedConsumer(
+ const cms::Destination* destination,
+ const std::string& selector,
+ bool noLocal) throw ( cms::CMSException ) {
+
+ try {
+
+ if( destination == NULL ) {
+ throw ActiveMQException(__FILE__, __LINE__, "destination is NULL");
+ }
+
+ // Append the selector and noLocal flag onto the key.
+ std::string key = getUniqueDestName(destination);
+ key += "s=";
+ key += selector;
+ key += ",nl=";
+ key += (noLocal? "t" : "f");
+
+ // Check the cache - add it if necessary.
+ CachedConsumer* cachedConsumer = NULL;
+ try {
+ cachedConsumer = consumerCache.getValue(key);
+ } catch( decaf::lang::exceptions::NoSuchElementException& e ) {
+
+ // No producer exists for this destination - start by creating
+ // a new consumer resource.
+ cms::MessageConsumer* c = session->createConsumer(destination,
selector, noLocal);
+
+ // Add the consumer resource to the resource lifecycle manager.
+ pool->getResourceLifecycleManager()->addMessageConsumer(c);
+
+ // Create the cached consumer wrapper.
+ cachedConsumer = new CachedConsumer(c);
+
+ // Add it to the cache.
+ consumerCache.setValue(key, cachedConsumer);
+ }
+
+ return cachedConsumer;
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
std::string PooledSession::getUniqueDestName( const cms::Destination* dest ) {
- std::string destName;
+ std::string destName = "[";
const cms::Queue* queue = dynamic_cast<const cms::Queue*>(dest);
if( queue != NULL ) {
- destName = "q:" + queue->getQueueName();
+ destName += "q:" + queue->getQueueName();
} else {
const cms::Topic* topic = dynamic_cast<const cms::Topic*>(dest);
if( topic != NULL ) {
- destName = "t:" + topic->getTopicName();
+ destName += "t:" + topic->getTopicName();
}
}
+
+ destName += "]";
return destName;
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h Sat
Feb 9 14:47:32 2008
@@ -21,6 +21,7 @@
#include <cms/Session.h>
#include <decaf/util/Map.h>
#include <activemq/cmsutil/CachedProducer.h>
+#include <activemq/cmsutil/CachedConsumer.h>
namespace activemq {
namespace cmsutil {
@@ -41,6 +42,8 @@
decaf::util::Map<std::string, CachedProducer*> producerCache;
+ decaf::util::Map<std::string, CachedConsumer*> consumerCache;
+
public:
PooledSession( SessionPool* pool, cms::Session* session );
@@ -111,6 +114,25 @@
throw ( cms::CMSException ) {
return session->createDurableConsumer(destination, name, selector,
noLocal);
}
+
+ /**
+ * First checks the internal consumer cache and creates on if none
exist
+ * for the given destination, selector, noLocal. If created, the
consumer is
+ * added to the pool's lifecycle manager.
+ *
+ * @param destiation
+ * the destination to receive on
+ * @param selector
+ * the selector to use
+ * @param noLocal
+ * whether or not to receive messages from the same connection
+ * @return the consumer resource
+ * @throws cms::CMSException if something goes wrong.
+ */
+ virtual cms::MessageConsumer* createCachedConsumer(
+ const cms::Destination* destination,
+ const std::string& selector,
+ bool noLocal) throw ( cms::CMSException );
virtual cms::MessageProducer* createProducer( const cms::Destination*
destination )
throw ( cms::CMSException ) {