Author: gsim
Date: Tue Oct 15 12:42:01 2013
New Revision: 1532308
URL: http://svn.apache.org/r1532308
Log:
QPID-5232: make subscriptions unreliable and autodeleted by default
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1532308&r1=1532307&r2=1532308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Tue Oct 15 12:42:01
2013
@@ -45,6 +45,17 @@ void Outgoing::wakeup()
session.wakeup();
}
+namespace {
+bool requested_reliable(pn_link_t* link)
+{
+ return pn_link_remote_snd_settle_mode(link) == PN_SND_UNSETTLED;
+}
+bool requested_unreliable(pn_link_t* link)
+{
+ return pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED;
+}
+}
+
OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string&
source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l,
Session& session,
qpid::sys::OutputControl& o,
SubscriptionType type, bool e, bool p)
: Outgoing(broker, session, source, target, pn_link_name(l)),
@@ -54,7 +65,8 @@ OutgoingFromQueue::OutgoingFromQueue(Bro
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
buffer(1024)/*used only for header at present*/,
- unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED)
+ //for exclusive queues, assume unreliable unless reliable is explicitly
requested; otherwise assume reliable unless unreliable requested
+ unreliable(exclusive ? !requested_reliable(link) :
requested_unreliable(link))
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -106,8 +118,8 @@ void OutgoingFromQueue::handle(pn_delive
write(&buffer[0], encoder.getPosition());
Translation t(r.msg);
t.write(*this);
- if (unreliable) pn_delivery_settle(delivery);
if (pn_link_advance(link)) {
+ if (unreliable) pn_delivery_settle(delivery);
--outstanding;
outgoingMessageSent();
QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from "
<< queue->getName() << ", index=" << r.index);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1532308&r1=1532307&r2=1532308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Tue Oct 15 12:42:01
2013
@@ -396,12 +396,13 @@ void Session::setupOutgoing(pn_link_t* l
authorise.access(node.exchange);//do separate access check before
trying to create the queue
bool shared = is_capability_requested(SHARED,
pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
- QueueSettings settings(durable, !durable);
+ bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) ==
PN_SND_SETTLED;
+ QueueSettings settings(durable, autodelete);
std::string altExchange;
if (node.topic) {
settings = node.topic->getPolicy();
settings.durable = durable;
- settings.autodelete = !durable;
+ settings.autodelete = autodelete;
altExchange = node.topic->getAlternateExchange();
}
settings.autoDeleteDelay = pn_terminus_get_timeout(source);
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1532308&r1=1532307&r2=1532308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Tue Oct 15
12:42:01 2013
@@ -812,6 +812,13 @@ void AddressHelper::configure(pn_link_t*
}
if (isUnreliable()) {
pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+ } else if (!reliability.empty()) {
+ if (reliability == EXACTLY_ONCE ) {
+ QPID_LOG(warning, "Unsupported reliability mode: " << reliability);
+ } else if (reliability != AT_LEAST_ONCE ) {
+ QPID_LOG(warning, "Unrecognised reliability mode: " <<
reliability);
+ }
+ pn_link_set_snd_settle_mode(link, PN_SND_UNSETTLED);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]