Author: aconway Date: Fri Feb 17 14:04:43 2012 New Revision: 1245479 URL: http://svn.apache.org/viewvc?rev=1245479&view=rev Log: QPID-3603: Move broker::ReplicatingSubscription to ha namespace and ha plugin.
Added: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h (with props) qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (contents, props changed) - copied, changed from r1245478, qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (contents, props changed) - copied, changed from r1245478, qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h Removed: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am?rev=1245479&r1=1245478&r2=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am Fri Feb 17 14:04:43 2012 @@ -542,6 +542,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Consumer.h \ qpid/broker/Credit.h \ qpid/broker/Credit.cpp \ + qpid/broker/ConsumerFactory.h \ qpid/broker/Daemon.cpp \ qpid/broker/Daemon.h \ qpid/broker/Deliverable.h \ @@ -629,8 +630,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueuedMessage.h \ qpid/broker/QueueFlowLimit.h \ qpid/broker/QueueFlowLimit.cpp \ - qpid/broker/ReplicatingSubscription.h \ - qpid/broker/ReplicatingSubscription.cpp \ qpid/broker/RateFlowcontrol.h \ qpid/broker/RecoverableConfig.h \ qpid/broker/RecoverableExchange.h \ Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk?rev=1245479&r1=1245478&r2=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk Fri Feb 17 14:04:43 2012 @@ -31,6 +31,8 @@ ha_la_SOURCES = \ qpid/ha/Settings.h \ qpid/ha/QueueReplicator.h \ qpid/ha/QueueReplicator.cpp \ + qpid/ha/ReplicatingSubscription.h \ + qpid/ha/ReplicatingSubscription.cpp \ qpid/ha/WiringReplicator.cpp \ qpid/ha/WiringReplicator.h Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h?rev=1245479&r1=1245478&r2=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h Fri Feb 17 14:04:43 2012 @@ -37,6 +37,7 @@ #include "qpid/broker/Vhost.h" #include "qpid/broker/System.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/ConsumerFactory.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -198,6 +199,7 @@ public: bool inCluster, clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConnectionCounter connectionCounter; + ConsumerFactories consumerFactories; public: virtual ~Broker(); @@ -356,6 +358,8 @@ public: const std::string& key, const std::string& userId, const std::string& connectionId); + + ConsumerFactories& getConsumerFactories() { return consumerFactories; } }; }} Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h?rev=1245479&r1=1245478&r2=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Consumer.h Fri Feb 17 14:04:43 2012 @@ -31,6 +31,9 @@ namespace broker { class Queue; class QueueListeners; +/** + * Base class for consumers which represent a subscription to a queue. + */ class Consumer { const bool acquires; // inListeners allows QueueListeners to efficiently track if this instance is registered Added: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h?rev=1245479&view=auto ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h (added) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h Fri Feb 17 14:04:43 2012 @@ -0,0 +1,70 @@ +#ifndef QPID_BROKER_CONSUMERFACTORY_H +#define QPID_BROKER_CONSUMERFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public. +// Refactor to use a more abstract interface. + +#include "qpid/broker/SemanticState.h" + +namespace qpid { +namespace broker { + +/** + * Base class for consumer factoires. Plugins can register a + * ConsumerFactory via Broker:: getConsumerFactories() Each time a + * conumer is created, each factory is tried in turn till one returns + * non-0. + */ +class ConsumerFactory +{ + public: + virtual ~ConsumerFactory() {} + + virtual boost::shared_ptr<SemanticState::ConsumerImpl> create( + SemanticState* parent, + const std::string& name, boost::shared_ptr<Queue> queue, + bool ack, bool acquire, bool exclusive, const std::string& tag, + const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments) = 0; +}; + +/** A set of factories held by the broker + * THREAD UNSAFE: see notes on member functions. + */ +class ConsumerFactories { + public: + typedef std::vector<boost::shared_ptr<ConsumerFactory> > Factories; + + /** Thread safety: May only be called during plug-in initialization. */ + void add(const boost::shared_ptr<ConsumerFactory>& cf) { factories.push_back(cf); } + + /** Thread safety: May only be called after plug-in initialization. */ + const Factories& get() const { return factories; } + + private: + Factories factories; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONSUMERFACTORY_H*/ Propchange: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConsumerFactory.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1245479&r1=1245478&r2=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Feb 17 14:04:43 2012 @@ -37,7 +37,7 @@ DeliveryRecord::DeliveryRecord(const Que bool _acquired, bool accepted, bool _windowing, - uint32_t _credit, bool _delayedCompletion) : msg(_msg), + uint32_t _credit, bool _isDelayedCompletion) : msg(_msg), queue(_queue), tag(_tag), acquired(_acquired), @@ -47,7 +47,7 @@ DeliveryRecord::DeliveryRecord(const Que ended(accepted && acquired), windowing(_windowing), credit(msg.payload ? msg.payload->getRequiredCredit() : _credit), - delayedCompletion(_delayedCompletion) + isDelayedCompletion(_isDelayedCompletion) {} bool DeliveryRecord::setEnded() @@ -115,7 +115,7 @@ bool DeliveryRecord::accept(TransactionC if (!ended) { if (acquired) { queue->dequeue(ctxt, msg); - } else if (delayedCompletion) { + } else if (isDelayedCompletion) { //TODO: this is a nasty way to do this; change it msg.payload->getIngressCompletion().finishCompleter(); QPID_LOG(debug, "Completed " << msg.payload.get()); Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1245479&r1=1245478&r2=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Feb 17 14:04:43 2012 @@ -63,7 +63,7 @@ class DeliveryRecord * after that). */ uint32_t credit; - bool delayedCompletion; + bool isDelayedCompletion; public: QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg, @@ -73,7 +73,7 @@ class DeliveryRecord bool accepted, bool windowing, uint32_t credit=0, // Only used if msg is empty. - bool delayedCompletion=false + bool isDelayedCompletion=false ); bool coveredBy(const framing::SequenceSet* const range) const { return range->contains(id); } Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1245479&r1=1245478&r2=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Feb 17 14:04:43 2012 @@ -25,9 +25,7 @@ #include "qpid/broker/DtxAck.h" #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Message.h" -#include "qpid/ha/WiringReplicator.h" #include "qpid/broker/Queue.h" -#include "qpid/broker/ReplicatingSubscription.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/TxAccept.h" @@ -108,15 +106,25 @@ bool SemanticState::exists(const string& namespace { const std::string SEPARATOR("::"); } - + void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, - bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) + bool exclusive, const string& resumeId, uint64_t resumeTtl, + const FieldTable& arguments) { // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). // Create a globally unique name so the broker can identify individual consumers std::string name = session.getSessionId().str() + SEPARATOR + tag; - ConsumerImpl::shared_ptr c(ConsumerImpl::create(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); + const ConsumerFactories::Factories& cf( + session.getBroker().getConsumerFactories().get()); + ConsumerImpl::shared_ptr c; + for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != cf.end(); !c) + c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, tag, + resumeId, resumeTtl, arguments); + if (!c) // Create plain consumer + c = ConsumerImpl::shared_ptr( + new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, + resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } @@ -266,26 +274,6 @@ void SemanticState::record(const Deliver const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::shared_ptr SemanticState::ConsumerImpl::create(SemanticState* parent, - const string& name, - Queue::shared_ptr queue, - bool ack, - bool acquire, - bool exclusive, - const string& tag, - const string& resumeId, - uint64_t resumeTtl, - const framing::FieldTable& arguments) -{ - if (arguments.isSet("qpid.replicating-subscription")) { - shared_ptr result(new ReplicatingSubscription(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - boost::dynamic_pointer_cast<ReplicatingSubscription>(result)->init(); - return result; - } else { - return shared_ptr(new ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - } -} - SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, @@ -297,7 +285,6 @@ SemanticState::ConsumerImpl::ConsumerImp uint64_t _resumeTtl, const framing::FieldTable& _arguments - ) : Consumer(_name, _acquire), parent(_parent), @@ -354,7 +341,7 @@ bool SemanticState::ConsumerImpl::delive { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, dynamic_cast<const ReplicatingSubscription*>(this)); + DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), acquire, !ackExpected, credit.isWindowMode(), 0, isDelayedCompletion()); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1245479&r1=1245478&r2=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.h Fri Feb 17 14:04:43 2012 @@ -153,11 +153,10 @@ class SemanticState : private boost::non management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - static shared_ptr create(SemanticState* parent, - const std::string& name, boost::shared_ptr<Queue> queue, - bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - + /** This consumer wants delayed completion. + * Overridden by ConsumerImpl subclasses. + */ + virtual bool isDelayedCompletion() const { return false; } }; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; Copied: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (from r1245478, qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp) URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?p2=qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp&p1=qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp&r1=1245478&r2=1245479&rev=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Feb 17 14:04:43 2012 @@ -20,18 +20,19 @@ */ #include "ReplicatingSubscription.h" -#include "Queue.h" +#include "qpid/broker/Queue.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" namespace qpid { -namespace broker { +namespace ha { using namespace framing; +using namespace broker; const std::string DOLLAR("$"); -const std::string INTERNAL("_internall"); +const std::string INTERNAL("_internal"); class ReplicationStateInitialiser { Propchange: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (from r1245478, qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h) URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?p2=qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h&p1=qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h&r1=1245478&r2=1245479&rev=1245479&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ReplicatingSubscription.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Feb 17 14:04:43 2012 @@ -23,35 +23,48 @@ */ #include "qpid/broker/SemanticState.h" +#include "qpid/broker/QueueObserver.h" namespace qpid { + namespace broker { +class Message; +class Queue; +class QueuedMessage; +class OwnershipToken; +} + +namespace ha { /** * Subscriber to a remote queue that replicates to a local queue. */ -class ReplicatingSubscription : public SemanticState::ConsumerImpl, public QueueObserver +class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, + public broker::QueueObserver { public: - ReplicatingSubscription(SemanticState* parent, - const std::string& name, boost::shared_ptr<Queue> queue, + ReplicatingSubscription(broker::SemanticState* parent, + const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, - const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); + const std::string& resumeId, uint64_t resumeTtl, + const framing::FieldTable& arguments); ~ReplicatingSubscription(); void init(); void cancel(); - bool deliver(QueuedMessage& msg); - void enqueued(const QueuedMessage&); - void dequeued(const QueuedMessage&); - void acquired(const QueuedMessage&) {} - void requeued(const QueuedMessage&) {} + bool deliver(broker::QueuedMessage& msg); + void enqueued(const broker::QueuedMessage&); + void dequeued(const broker::QueuedMessage&); + void acquired(const broker::QueuedMessage&) {} + void requeued(const broker::QueuedMessage&) {} + + bool isDelayedCompletion() const { return true; } protected: bool doDispatch(); private: - boost::shared_ptr<Queue> events; - boost::shared_ptr<Consumer> consumer; + boost::shared_ptr<broker::Queue> events; + boost::shared_ptr<broker::Consumer> consumer; qpid::framing::SequenceSet range; void generateDequeueEvent(); @@ -60,7 +73,7 @@ class ReplicatingSubscription : public S public: DelegatingConsumer(ReplicatingSubscription&); ~DelegatingConsumer(); - bool deliver(QueuedMessage& msg); + bool deliver(broker::QueuedMessage& msg); void notify(); bool filter(boost::intrusive_ptr<Message>); bool accept(boost::intrusive_ptr<Message>); Propchange: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h ------------------------------------------------------------------------------ svn:keywords = Rev Date --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org