Alan Conway wrote:
Apologies for doing this right at the code freeze but I only realized 2
days ago that the AckPolicy API was broken. The new API is only
incompatible for code that uses AckPolicy, it provides full flexibility
for manual or automatic control over acquire and accept of subscribed
messages. It also makes Subscription a first-class object which provides
a single logical place to collect future changes to subscriptions.
I like the changes; a big improvement! A couple of points however:
* In moving from AckPolicy the automatic sending of completion status
has been lost. That is needed for window mode. (A message should only be
marked completed when processed in my view, i.e. after and not before
the listeners received call returns).
* I'm not sure that it makes sense to conflate acquire with acking on
autoAck. (If we need it, I think a separate autoAcquire setting would be
better).
* When acquiring a message you have to test the results before you can
be sure that the message was actually acquired.
Attached is a patch that addresses these points. Thoughts?
Index: src/qpid/client/SubscriptionImpl.cpp
===================================================================
--- src/qpid/client/SubscriptionImpl.cpp (revision 708480)
+++ src/qpid/client/SubscriptionImpl.cpp (working copy)
@@ -27,6 +27,7 @@
namespace client {
using sys::Mutex;
+using framing::MessageAcquireResult;
SubscriptionImpl::SubscriptionImpl(SubscriptionManager& m, const std::string& q, const SubscriptionSettings& s, const std::string& n, MessageListener* l)
: manager(m), name(n), queue(q), settings(s), listener(l)
@@ -68,16 +69,19 @@
void SubscriptionImpl::acquire(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
- manager.getSession().messageAcquire(messageIds);
- unacquired.remove(messageIds);
+ MessageAcquireResult result = manager.getSession().messageAcquire(messageIds);
+ unacquired.remove(result.getTransfers());
if (settings.acceptMode == ACCEPT_MODE_EXPLICIT)
- unaccepted.add(messageIds);
+ unaccepted.add(result.getTransfers());
}
void SubscriptionImpl::accept(const SequenceSet& messageIds) {
Mutex::ScopedLock l(lock);
manager.getSession().messageAccept(messageIds);
unaccepted.remove(messageIds);
+ if (settings.autoComplete) {
+ manager.getSession().sendCompletion();
+ }
}
Session SubscriptionImpl::getSession() const { return manager.getSession(); }
@@ -88,7 +92,6 @@
void SubscriptionImpl::received(Message& m) {
Mutex::ScopedLock l(lock);
- manager.getSession().markCompleted(m.getId(), false, false);
if (m.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
unacquired.add(m.getId());
else if (m.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
@@ -99,15 +102,16 @@
listener->received(m);
}
+ if (settings.autoComplete) {
+ manager.getSession().markCompleted(m.getId(), false, false);
+ }
if (settings.autoAck) {
- if (unacquired.size() + unaccepted.size() >= settings.autoAck) {
- if (unacquired.size()) {
- async(manager.getSession()).messageAcquire(unacquired);
- unaccepted.add(unacquired);
- unaccepted.clear();
- }
+ if (unaccepted.size() >= settings.autoAck) {
async(manager.getSession()).messageAccept(unaccepted);
unaccepted.clear();
+ if (settings.autoComplete) {
+ manager.getSession().sendCompletion();
+ }
}
}
}
Index: src/qpid/client/FlowControl.h
===================================================================
--- src/qpid/client/FlowControl.h (revision 708480)
+++ src/qpid/client/FlowControl.h (working copy)
@@ -42,8 +42,9 @@
* is renewed.
*
* In "window mode" credit is automatically renewed when a message is
- * accepted. In non-window mode credit is not automatically renewed,
- * it must be explicitly re-set (@see Subscription)
+ * completed (which by default happens when it is accepted). In
+ * non-window mode credit is not automatically renewed, it must be
+ * explicitly re-set (@see Subscription)
*/
struct FlowControl {
static const uint32_t UNLIMITED=0xFFFFFFFF;
Index: src/qpid/client/SubscriptionSettings.h
===================================================================
--- src/qpid/client/SubscriptionSettings.h (revision 708480)
+++ src/qpid/client/SubscriptionSettings.h (working copy)
@@ -39,22 +39,33 @@
FlowControl flow=FlowControl::unlimited(),
AcceptMode accept=ACCEPT_MODE_EXPLICIT,
AcquireMode acquire=ACQUIRE_MODE_PRE_ACQUIRED,
- unsigned int autoAck_=1
- ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_) {}
+ unsigned int autoAck_=1,
+ bool autoComplete_=true
+ ) : flowControl(flow), acceptMode(accept), acquireMode(acquire), autoAck(autoAck_), autoComplete(autoComplete_) {}
FlowControl flowControl; ///@< Flow control settings. @see FlowControl
AcceptMode acceptMode; ///@< ACCEPT_MODE_EXPLICIT or ACCEPT_MODE_NONE
AcquireMode acquireMode; ///@< ACQUIRE_MODE_PRE_ACQUIRED or ACQUIRE_MODE_NOT_ACQUIRED
- /** Automatically acknowledge (acquire and accept) batches of autoAck messages.
- * 0 means no automatic acknowledgement. What it means to "acknowledge" a message depends on
- * acceptMode and acquireMode:
- * - ACCEPT_MODE_NONE and ACQUIRE_MODE_PRE_ACQUIRED: do nothing
- * - ACCEPT_MODE_NONE and ACQUIRE_MODE_NOT_ACQUIRED: send an "acquire" command
- * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_PRE_ACQUIRED: send "accept" command
- * - ACCEPT_MODE_EXPLICIT and ACQUIRE_MODE_NOT_ACQUIRED: send "acquire" and "accept" commands
+ /** Automatically acknowledge (accept) batches of autoAck
+ * messages. 0 means no automatic acknowledgement. This has no
+ * effect for messsages received for a subscription with
+ * ACCEPT_MODE_NODE.*/
+ unsigned int autoAck;
+ /**
+ * If set to true, messages will be marked as completed (in
+ * windowing mode, completion of a message will cause the credit
+ * used up by that message to be reallocated) once they have been
+ * received. The server will be explicitly notified of all
+ * completed messages when the next accept is sent through the
+ * subscription (either explictly or through autAck). However the
+ * server may also periodically request information on the
+ * completed messages.
+ *
+ * If set to false the application is responsible for completing
+ * messages (@see Session::markCompleted()).
*/
- unsigned int autoAck;
+ bool autoComplete;
};
}} // namespace qpid::client
Index: src/tests/txtest.cpp
===================================================================
--- src/tests/txtest.cpp (revision 708540)
+++ src/tests/txtest.cpp (working copy)
@@ -51,11 +51,12 @@
uint txCount;
uint totalMsgCount;
bool dtx;
+ bool quiet;
Args() : init(true), transfer(true), check(true),
size(256), durable(true), queues(2),
base("tx-test"), msgsPerTx(1), txCount(1), totalMsgCount(10),
- dtx(false)
+ dtx(false), quiet(false)
{
addOptions()
@@ -69,7 +70,8 @@
("messages-per-tx", optValue(msgsPerTx, "N"), "number of messages transferred per transaction")
("tx-count", optValue(txCount, "N"), "number of transactions per 'agent'")
("total-messages", optValue(totalMsgCount, "N"), "total number of messages in 'circulation'")
- ("dtx", optValue(dtx, "yes|no"), "use distributed transactions");
+ ("dtx", optValue(dtx, "yes|no"), "use distributed transactions")
+ ("quiet", optValue(quiet), "reduce output from test");
}
};
@@ -159,7 +161,6 @@
session.messageTransfer(arg::content=out, arg::acceptMode=1);
}
sub.accept(sub.getUnaccepted());
- session.sendCompletion();
if (opts.dtx) {
session.dtxEnd(arg::xid=xid);
session.dtxPrepare(arg::xid=xid);
@@ -219,7 +220,7 @@
StringSet::iterator next = i + 1;
if (next == queues.end()) next = queues.begin();
- std::cout << "Transfering from " << *i << " to " << *next << std::endl;
+ if (!opts.quiet) std::cout << "Transfering from " << *i << " to " << *next << std::endl;
agents.push_back(new Transfer(*i, *next));
agents.back().thread = Thread(agents.back());
}
@@ -241,13 +242,13 @@
xidArr.collect(inDoubtXids);
if (inDoubtXids.size()) {
- std::cout << "Recovering DTX in-doubt transaction(s):" << std::endl;
+ if (!opts.quiet) std::cout << "Recovering DTX in-doubt transaction(s):" << std::endl;
framing::StructHelper decoder;
framing::Xid xid;
// abort even, commit odd transactions
for (unsigned i = 0; i < inDoubtXids.size(); i++) {
decoder.decode(xid, inDoubtXids[i]);
- std::cout << (i%2 ? " * aborting " : " * committing ");
+ if (!opts.quiet) std::cout << (i%2 ? " * aborting " : " * committing ");
xid.print(std::cout);
std::cout << std::endl;
if (i%2) {
@@ -276,7 +277,7 @@
drained.push_back(m.getMessageProperties().getCorrelationId());
++count;
}
- std::cout << "Drained " << count << " messages from " << *i << std::endl;
+ if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl;
}
sort(ids.begin(), ids.end());
Index: src/tests/quick_txtest
===================================================================
--- src/tests/quick_txtest (revision 0)
+++ src/tests/quick_txtest (revision 0)
@@ -0,0 +1,2 @@
+#!/bin/sh
+exec `dirname $0`/run_test ./txtest --queues 4 --tx-count 10 --quiet
Property changes on: src/tests/quick_txtest
___________________________________________________________________
Name: svn:executable
+ *
Index: src/tests/Makefile.am
===================================================================
--- src/tests/Makefile.am (revision 708480)
+++ src/tests/Makefile.am (working copy)
@@ -134,7 +134,7 @@
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
-system_tests = client_test quick_perftest quick_topictest run_header_test
+system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests
EXTRA_DIST += \
@@ -142,6 +142,7 @@
run-unit-tests start_broker python_tests stop_broker \
quick_topictest \
quick_perftest \
+ quick_txtest \
topictest \
run_header_test \
header_test.py \