Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp Tue Mar 3 14:58:01 2015 @@ -20,34 +20,53 @@ */ #include "PnData.h" #include "qpid/types/encodings.h" +#include "qpid/log/Statement.h" namespace qpid { namespace messaging { namespace amqp { using types::Variant; +using namespace types::encodings; -void PnData::write(const Variant::Map& map) +// TODO aconway 2014-11-20: PnData duplicates functionality of qpid::amqp::Encoder,Decoder. +// Collapse them all into a single proton-based codec. + +void PnData::put(const Variant::Map& map) { pn_data_put_map(data); pn_data_enter(data); for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) { - pn_data_put_string(data, str(i->first)); - write(i->second); + pn_data_put_string(data, bytes(i->first)); + put(i->second); } pn_data_exit(data); } -void PnData::write(const Variant::List& list) + +void PnData::put(const Variant::List& list) { pn_data_put_list(data); pn_data_enter(data); for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { - write(*i); + put(*i); } pn_data_exit(data); } -void PnData::write(const Variant& value) + +void PnData::put(const Variant& value) { + // Open data descriptors associated with the value. + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) { + pn_data_put_described(data); + pn_data_enter(data); + if (i->getType() == types::VAR_STRING) + pn_data_put_symbol(data, bytes(i->asString())); + else + pn_data_put_ulong(data, i->asUint64()); + } + + // Put the variant value switch (value.getType()) { case qpid::types::VAR_VOID: pn_data_put_null(data); @@ -65,61 +84,70 @@ void PnData::write(const Variant& value) pn_data_put_double(data, value.asDouble()); break; case qpid::types::VAR_STRING: - pn_data_put_string(data, str(value.asString())); + if (value.getEncoding() == ASCII) + pn_data_put_symbol(data, bytes(value.asString())); + else if (value.getEncoding() == BINARY) + pn_data_put_binary(data, bytes(value.asString())); + else + pn_data_put_string(data, bytes(value.asString())); break; case qpid::types::VAR_MAP: - write(value.asMap()); + put(value.asMap()); break; case qpid::types::VAR_LIST: - write(value.asList()); + put(value.asList()); break; default: break; } + + // Close any descriptors. + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) + pn_data_exit(data); } -bool PnData::read(qpid::types::Variant& value) +bool PnData::get(qpid::types::Variant& value) { - return read(pn_data_type(data), value); + return get(pn_data_type(data), value); } -void PnData::readList(qpid::types::Variant::List& value) +void PnData::getList(qpid::types::Variant::List& value) { size_t count = pn_data_get_list(data); pn_data_enter(data); for (size_t i = 0; i < count && pn_data_next(data); ++i) { qpid::types::Variant e; - if (read(e)) value.push_back(e); + if (get(e)) value.push_back(e); } pn_data_exit(data); } -void PnData::readMap(qpid::types::Variant::Map& value) +void PnData::getMap(qpid::types::Variant::Map& value) { size_t count = pn_data_get_list(data); pn_data_enter(data); for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) { - std::string key = str(pn_data_get_symbol(data)); + std::string key = string(pn_data_get_symbol(data)); pn_data_next(data); qpid::types::Variant e; - if (read(e)) value[key]= e; + if (get(e)) value[key]= e; } pn_data_exit(data); } -void PnData::readArray(qpid::types::Variant::List& value) +void PnData::getArray(qpid::types::Variant::List& value) { size_t count = pn_data_get_array(data); pn_type_t type = pn_data_get_array_type(data); pn_data_enter(data); for (size_t i = 0; i < count && pn_data_next(data); ++i) { qpid::types::Variant e; - if (read(type, e)) value.push_back(e); + if (get(type, e)) value.push_back(e); } pn_data_exit(data); } -bool PnData::read(pn_type_t type, qpid::types::Variant& value) +bool PnData::get(pn_type_t type, qpid::types::Variant& value) { switch (type) { case PN_NULL: @@ -168,41 +196,41 @@ bool PnData::read(pn_type_t type, qpid:: value = qpid::types::Uuid(pn_data_get_uuid(data).bytes); return true; case PN_BINARY: - value = str(pn_data_get_binary(data)); + value = string(pn_data_get_binary(data)); value.setEncoding(qpid::types::encodings::BINARY); return true; case PN_STRING: - value = str(pn_data_get_string(data)); + value = string(pn_data_get_string(data)); value.setEncoding(qpid::types::encodings::UTF8); return true; case PN_SYMBOL: - value = str(pn_data_get_string(data)); + value = string(pn_data_get_string(data)); value.setEncoding(qpid::types::encodings::ASCII); return true; case PN_LIST: value = qpid::types::Variant::List(); - readList(value.asList()); + getList(value.asList()); return true; break; case PN_MAP: value = qpid::types::Variant::Map(); - readMap(value.asMap()); + getMap(value.asMap()); return true; case PN_ARRAY: value = qpid::types::Variant::List(); - readArray(value.asList()); + getArray(value.asList()); return true; case PN_DESCRIBED: + // TODO aconway 2014-11-20: get described values. case PN_DECIMAL32: case PN_DECIMAL64: case PN_DECIMAL128: default: return false; } - } -pn_bytes_t PnData::str(const std::string& s) +pn_bytes_t PnData::bytes(const std::string& s) { pn_bytes_t result; result.start = const_cast<char*>(s.data()); @@ -210,7 +238,7 @@ pn_bytes_t PnData::str(const std::string return result; } -std::string PnData::str(const pn_bytes_t& in) +std::string PnData::string(const pn_bytes_t& in) { return std::string(in.start, in.size); }
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h Tue Mar 3 14:58:01 2015 @@ -32,28 +32,29 @@ namespace messaging { namespace amqp { /** - * Helper class to read/write messaging types to/from pn_data_t. + * Helper class to put/get messaging types to/from pn_data_t. */ class PnData { public: - PnData(pn_data_t* d) : data(d) {} + pn_data_t* data; - void write(const types::Variant& value); - void write(const types::Variant::Map& map); - void write(const types::Variant::List& list); - - bool read(pn_type_t type, types::Variant& value); - bool read(types::Variant& value); - void readList(types::Variant::List& value); - void readMap(types::Variant::Map& value); - void readArray(types::Variant::List& value); + PnData(pn_data_t* d) : data(d) {} - static pn_bytes_t str(const std::string&); - static std::string str(const pn_bytes_t&); + void put(const types::Variant& value); + void put(const types::Variant::Map& map); + void put(const types::Variant::List& list); + void put(int32_t n) { pn_data_put_int(data, n); } + void putSymbol(const std::string& symbol) { pn_data_put_symbol(data, bytes(symbol)); } + + bool get(pn_type_t type, types::Variant& value); + bool get(types::Variant& value); + void getList(types::Variant::List& value); + void getMap(types::Variant::Map& value); + void getArray(types::Variant::List& value); - private: - pn_data_t* data; + static pn_bytes_t bytes(const std::string&); + static std::string string(const pn_bytes_t&); }; }}} // namespace messaging::amqp Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Tue Mar 3 14:58:01 2015 @@ -37,9 +37,10 @@ ReceiverContext::ReceiverContext(pn_sess helper(address), receiver(pn_receiver(session, name.c_str())), capacity(0), used(0) {} + ReceiverContext::~ReceiverContext() { - pn_link_free(receiver); + if (receiver) pn_link_free(receiver); } void ReceiverContext::setCapacity(uint32_t c) @@ -63,12 +64,13 @@ uint32_t ReceiverContext::getAvailable() uint32_t ReceiverContext::getUnsettled() { + assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver)); return pn_link_unsettled(receiver) - pn_link_queued(receiver); } void ReceiverContext::close() { - pn_link_close(receiver); + if (receiver) pn_link_close(receiver); } const std::string& ReceiverContext::getName() const @@ -96,7 +98,7 @@ void ReceiverContext::verify() } void ReceiverContext::configure() { - configure(pn_link_source(receiver)); + if (receiver) configure(pn_link_source(receiver)); } void ReceiverContext::configure(pn_terminus_t* source) { @@ -116,13 +118,13 @@ Address ReceiverContext::getAddress() co void ReceiverContext::reset(pn_session_t* session) { - receiver = pn_receiver(session, name.c_str()); - configure(); + receiver = session ? pn_receiver(session, name.c_str()) : 0; + if (receiver) configure(); } bool ReceiverContext::hasCurrent() { - return pn_link_current(receiver); + return receiver && pn_link_current(receiver); } bool ReceiverContext::wakeupToIssueCredit() Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Tue Mar 3 14:58:01 2015 @@ -18,8 +18,10 @@ * under the License. * */ -#include "qpid/messaging/amqp/SenderContext.h" -#include "qpid/messaging/amqp/EncodedMessage.h" +#include "SenderContext.h" +#include "Transaction.h" +#include "EncodedMessage.h" +#include "PnData.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/exceptions.h" #include "qpid/Exception.h" @@ -40,22 +42,29 @@ extern "C" { namespace qpid { namespace messaging { namespace amqp { + //TODO: proper conversion to wide string for address -SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_) - : name(n), +SenderContext::SenderContext(pn_session_t* session, const std::string& n, + const qpid::messaging::Address& a, + bool setToOnSend_, + const CoordinatorPtr& coord) + : sender(pn_sender(session, n.c_str())), + name(n), address(a), helper(address), - sender(pn_sender(session, n.c_str())), nextId(0), capacity(50), unreliable(helper.isUnreliable()), - setToOnSend(setToOnSend_) {} + nextId(0), capacity(50), unreliable(helper.isUnreliable()), + setToOnSend(setToOnSend_), + transaction(coord) +{} SenderContext::~SenderContext() { - pn_link_free(sender); + if (sender) pn_link_free(sender); } void SenderContext::close() { - pn_link_close(sender); + if (sender) pn_link_close(sender); } void SenderContext::setCapacity(uint32_t c) @@ -88,10 +97,13 @@ bool SenderContext::send(const qpid::mes { resend();//if there are any messages needing to be resent at the front of the queue, send them first if (processUnsettled(false) < capacity && pn_link_credit(sender)) { + types::Variant state; + if (transaction) + state = transaction->getSendState(); if (unreliable) { Delivery delivery(nextId++); delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); + delivery.send(sender, unreliable, state); *out = 0; return true; } else { @@ -99,7 +111,7 @@ bool SenderContext::send(const qpid::mes try { Delivery& delivery = deliveries.back(); delivery.encode(MessageImplAccess::get(message), address, setToOnSend); - delivery.send(sender, unreliable); + delivery.send(sender, unreliable, state); *out = &delivery; return true; } catch (const std::exception& e) { @@ -507,7 +519,8 @@ void SenderContext::Delivery::encode(con throw SendError(e.what()); } } -void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) + +void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state) { pn_delivery_tag_t tag; tag.size = sizeof(id); @@ -517,6 +530,11 @@ void SenderContext::Delivery::send(pn_li tag.bytes = reinterpret_cast<const char*>(&id); #endif token = pn_delivery(sender, tag); + if (!state.isVoid()) { // Add transaction state + PnData data(pn_disposition_data(pn_delivery_local(token))); + data.put(state); + pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE); + } pn_link_send(sender, encoded.getData(), encoded.getSize()); if (unreliable) { pn_delivery_settle(token); @@ -551,6 +569,15 @@ bool SenderContext::Delivery::rejected() { return pn_delivery_remote_state(token) == PN_REJECTED; } + +std::string SenderContext::Delivery::error() +{ + pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token)); + return (condition && pn_condition_is_set(condition)) ? + Msg() << pn_condition_get_name(condition) << ": " << pn_condition_get_description(condition) : + std::string(); +} + void SenderContext::Delivery::settle() { pn_delivery_settle(token); @@ -570,10 +597,12 @@ void SenderContext::verify() helper.checkAssertion(target, AddressHelper::FOR_SENDER); } + void SenderContext::configure() { - configure(pn_link_target(sender)); + if (sender) configure(pn_link_target(sender)); } + void SenderContext::configure(pn_terminus_t* target) { helper.configure(sender, target, AddressHelper::FOR_SENDER); @@ -603,12 +632,10 @@ Address SenderContext::getAddress() cons void SenderContext::reset(pn_session_t* session) { - sender = pn_sender(session, name.c_str()); - configure(); - - for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) { + sender = session ? pn_sender(session, name.c_str()) : 0; + if (sender) configure(); + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) i->reset(); - } } void SenderContext::resend() Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Tue Mar 3 14:58:01 2015 @@ -24,6 +24,7 @@ #include <deque> #include <string> #include <vector> +#include <boost/shared_ptr.hpp> #include "qpid/sys/IntegerTypes.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/amqp/AddressHelper.h" @@ -41,9 +42,10 @@ class Message; class MessageImpl; namespace amqp { -/** - * - */ + +class Transaction; + + class SenderContext { public: @@ -52,13 +54,15 @@ class SenderContext public: Delivery(int32_t id); void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField); - void send(pn_link_t*, bool unreliable); + void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant()); bool delivered(); bool accepted(); bool rejected(); void settle(); void reset(); bool sent() const; + pn_delivery_t* getToken() const { return token; } + std::string error(); private: int32_t id; pn_delivery_t* token; @@ -66,22 +70,32 @@ class SenderContext bool presettled; }; - SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, bool setToOnSend); + typedef boost::shared_ptr<Transaction> CoordinatorPtr; + + SenderContext(pn_session_t* session, const std::string& name, + const qpid::messaging::Address& target, + bool setToOnSend, + const CoordinatorPtr& transaction = CoordinatorPtr()); ~SenderContext(); - void reset(pn_session_t* session); - void close(); - void setCapacity(uint32_t); - uint32_t getCapacity(); - uint32_t getUnsettled(); - const std::string& getName() const; - const std::string& getTarget() const; - bool send(const qpid::messaging::Message& message, Delivery**); - void configure(); - void verify(); - void check(); - bool settled(); - bool closed(); - Address getAddress() const; + + virtual void reset(pn_session_t* session); + virtual void close(); + virtual void setCapacity(uint32_t); + virtual uint32_t getCapacity(); + virtual uint32_t getUnsettled(); + virtual const std::string& getName() const; + virtual const std::string& getTarget() const; + virtual bool send(const qpid::messaging::Message& message, Delivery**); + virtual void configure(); + virtual void verify(); + virtual void check(); + virtual bool settled(); + virtual bool closed(); + virtual Address getAddress() const; + + protected: + pn_link_t* sender; + private: friend class ConnectionContext; typedef std::deque<Delivery> Deliveries; @@ -89,12 +103,12 @@ class SenderContext const std::string name; qpid::messaging::Address address; AddressHelper helper; - pn_link_t* sender; int32_t nextId; Deliveries deliveries; uint32_t capacity; bool unreliable; bool setToOnSend; + boost::shared_ptr<Transaction> transaction; uint32_t processUnsettled(bool silent); void configure(pn_terminus_t*); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp Tue Mar 3 14:58:01 2015 @@ -39,7 +39,8 @@ SenderHandle::SenderHandle(boost::shared void SenderHandle::send(const Message& message, bool sync) { - connection->send(session, sender, message, sync); + SenderContext::Delivery* d = 0; + connection->send(session, sender, message, sync, &d); } void SenderHandle::close() Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Tue Mar 3 14:58:01 2015 @@ -21,11 +21,15 @@ #include "SessionContext.h" #include "SenderContext.h" #include "ReceiverContext.h" +#include "Transaction.h" +#include "PnData.h" #include <boost/format.hpp> #include "qpid/messaging/Address.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/exceptions.h" #include "qpid/log/Statement.h" +#include "qpid/amqp/descriptors.h" + extern "C" { #include <proton/engine.h> } @@ -35,23 +39,32 @@ namespace messaging { namespace amqp { SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {} + SessionContext::~SessionContext() { - senders.clear(); receivers.clear(); - pn_session_free(session); + // Clear all pointers to senders and receivers before we free the session. + senders.clear(); + receivers.clear(); + transaction.reset(); // Transaction is a sender. + if (!error && session) + pn_session_free(session); } boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend) { + error.raise(); std::string name = AddressHelper::getLinkName(address); - if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection"); - boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address, setToOnSend)); + if (senders.find(name) != senders.end()) + throw LinkError("Link name must be unique within the scope of the connection"); + boost::shared_ptr<SenderContext> s( + new SenderContext(session, name, address, setToOnSend, transaction)); senders[name] = s; return s; } boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address) { + error.raise(); std::string name = AddressHelper::getLinkName(address); if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection"); boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address)); @@ -61,6 +74,7 @@ boost::shared_ptr<ReceiverContext> Sessi boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const { + error.raise(); SenderMap::const_iterator i = senders.find(name); if (i == senders.end()) { throw qpid::messaging::KeyError(std::string("No such sender") + name); @@ -71,6 +85,7 @@ boost::shared_ptr<SenderContext> Session boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const { + error.raise(); ReceiverMap::const_iterator i = receivers.find(name); if (i == receivers.end()) { throw qpid::messaging::KeyError(std::string("No such receiver") + name); @@ -81,16 +96,19 @@ boost::shared_ptr<ReceiverContext> Sessi void SessionContext::removeReceiver(const std::string& n) { + error.raise(); receivers.erase(n); } void SessionContext::removeSender(const std::string& n) { + error.raise(); senders.erase(n); } boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver() { + error.raise(); for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { if (i->second->hasCurrent()) { return i->second; @@ -102,16 +120,19 @@ boost::shared_ptr<ReceiverContext> Sessi uint32_t SessionContext::getReceivable() { + error.raise(); return 0;//TODO } uint32_t SessionContext::getUnsettledAcks() { + error.raise(); return 0;//TODO } qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) { + error.raise(); qpid::framing::SequenceNumber id = next++; if (!pn_delivery_settled(delivery)) unacked[id] = delivery; @@ -121,22 +142,32 @@ qpid::framing::SequenceNumber SessionCon void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end) { + error.raise(); for (DeliveryMap::iterator i = begin; i != end; ++i) { - QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second); - pn_delivery_update(i->second, PN_ACCEPTED); - pn_delivery_settle(i->second);//TODO: different settlement modes? + types::Variant txState; + if (transaction) { + QPID_LOG(trace, "Setting disposition for transactional delivery " + << i->first << " -> " << i->second); + transaction->acknowledge(i->second); + } else { + QPID_LOG(trace, "Setting disposition for delivery " << i->first << " -> " << i->second); + pn_delivery_update(i->second, PN_ACCEPTED); + pn_delivery_settle(i->second); //TODO: different settlement modes? + } } unacked.erase(begin, end); } void SessionContext::acknowledge() { + error.raise(); QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages"); acknowledge(unacked.begin(), unacked.end()); } void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative) { + error.raise(); QPID_LOG(debug, "acknowledging selected messages, id=" << id << ", cumulative=" << cumulative); DeliveryMap::iterator i = unacked.find(id); if (i != unacked.end()) { @@ -149,6 +180,7 @@ void SessionContext::acknowledge(const q void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject) { + error.raise(); DeliveryMap::iterator i = unacked.find(id); if (i != unacked.end()) { if (reject) { @@ -166,7 +198,9 @@ void SessionContext::nack(const qpid::fr bool SessionContext::settled() { + error.raise(); bool result = true; + for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { try { if (!i->second->closed() && !i->second->settled()) result = false; @@ -189,8 +223,25 @@ std::string SessionContext::getName() co void SessionContext::reset(pn_connection_t* connection) { - session = pn_session(connection); unacked.clear(); + if (transaction) { + if (transaction->isCommitting()) + error = new TransactionUnknown("Transaction outcome unknown: transport failure"); + else + error = new TransactionAborted("Transaction aborted: transport failure"); + resetSession(0); + senders.clear(); + receivers.clear(); + transaction.reset(); + return; + } + resetSession(pn_session(connection)); + +} + +void SessionContext::resetSession(pn_session_t* session_) { + session = session_; + if (transaction) transaction->reset(session); for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { i->second->reset(session); } @@ -198,4 +249,6 @@ void SessionContext::reset(pn_connection i->second->reset(session); } } + + }}} // namespace qpid::messaging::amqp Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Tue Mar 3 14:58:01 2015 @@ -26,6 +26,7 @@ #include <boost/shared_ptr.hpp> #include "qpid/sys/IntegerTypes.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/ExceptionHolder.h" struct pn_connection_t; struct pn_session_t; @@ -42,6 +43,8 @@ namespace amqp { class ConnectionContext; class SenderContext; class ReceiverContext; +class Transaction; + /** * */ @@ -63,23 +66,29 @@ class SessionContext bool settled(); void setName(const std::string&); std::string getName() const; + + void nack(const qpid::framing::SequenceNumber& id, bool reject); + private: friend class ConnectionContext; typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap; typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap; + pn_session_t* session; SenderMap senders; + boost::shared_ptr<Transaction> transaction; ReceiverMap receivers; DeliveryMap unacked; qpid::framing::SequenceNumber next; std::string name; + sys::ExceptionHolder error; qpid::framing::SequenceNumber record(pn_delivery_t*); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); - void nack(const qpid::framing::SequenceNumber& id, bool reject); + void resetSession(pn_session_t*); }; }}} // namespace qpid::messaging::amqp Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Tue Mar 3 14:58:01 2015 @@ -42,12 +42,12 @@ SessionHandle::SessionHandle(boost::shar void SessionHandle::commit() { - + connection->commit(session); } void SessionHandle::rollback() { - + connection->rollback(session); } void SessionHandle::acknowledge(bool /*sync*/) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Mar 3 14:58:01 2015 @@ -36,17 +36,20 @@ namespace sys { struct ProtocolTimeoutTask : public sys::TimerTask { AsynchIOHandler& handler; std::string id; + Duration timeout; - ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) : - TimerTask(timeout, "ProtocolTimeout"), + ProtocolTimeoutTask(const std::string& i, const Duration& timeout_, AsynchIOHandler& h) : + TimerTask(timeout_, "ProtocolTimeout"), handler(h), - id(i) + id(i), + timeout(timeout_) {} void fire() { // If this fires it means that we didn't negotiate the connection in the timeout period // Schedule closing the connection for the io thread - QPID_LOG(error, "Connection " << id << " No protocol received closing"); + QPID_LOG(error, "Connection " << id << " No protocol received after " << timeout + << ", closing"); handler.abort(); } }; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp Tue Mar 3 14:58:01 2015 @@ -43,21 +43,23 @@ class VariantImpl { public: VariantImpl(); - VariantImpl(bool); - VariantImpl(uint8_t); - VariantImpl(uint16_t); - VariantImpl(uint32_t); - VariantImpl(uint64_t); - VariantImpl(int8_t); - VariantImpl(int16_t); - VariantImpl(int32_t); - VariantImpl(int64_t); - VariantImpl(float); - VariantImpl(double); - VariantImpl(const std::string&, const std::string& encoding=std::string()); - VariantImpl(const Variant::Map&); - VariantImpl(const Variant::List&); - VariantImpl(const Uuid&); + void reset(); + void set(bool); + void set(uint8_t); + void set(uint16_t); + void set(uint32_t); + void set(uint64_t); + void set(int8_t); + void set(int16_t); + void set(int32_t); + void set(int64_t); + void set(float); + void set(double); + void set(const std::string&, const std::string& encoding=std::string()); + void set(const Variant::Map&); + void set(const Variant::List&); + void set(const Uuid&); + void set(const Variant&); ~VariantImpl(); VariantType getType() const; @@ -90,9 +92,10 @@ class VariantImpl bool isEqualTo(VariantImpl&) const; bool isEquivalentTo(VariantImpl&) const; - static VariantImpl* create(const Variant&); + Variant::List descriptors; // Optional descriptors for described value. + private: - const VariantType type; + VariantType type; union { bool b; uint8_t ui8; @@ -110,7 +113,7 @@ class VariantImpl Variant::List* list; std::string* string; } value; - std::string encoding;//optional encoding for variable length data + std::string encoding; // Optional encoding for variable length data. template<class T> T convertFromString() const { @@ -136,26 +139,34 @@ class VariantImpl }; +VariantImpl::VariantImpl() : type(VAR_VOID) {} + +void VariantImpl::set(bool b) { reset(); type = VAR_BOOL; value.b = b; } +void VariantImpl::set(uint8_t i) { reset(); type = VAR_UINT8; value.ui8 = i; } +void VariantImpl::set(uint16_t i) { reset(); type = VAR_UINT16; value.ui16 = i; } +void VariantImpl::set(uint32_t i) { reset(); type = VAR_UINT32; value.ui32 = i; } +void VariantImpl::set(uint64_t i) { reset(); type = VAR_UINT64; value.ui64 = i; } +void VariantImpl::set(int8_t i) { reset(); type = VAR_INT8; value.i8 = i; } +void VariantImpl::set(int16_t i) { reset(); type = VAR_INT16; value.i16 = i; } +void VariantImpl::set(int32_t i) { reset(); type = VAR_INT32; value.i32 = i; } +void VariantImpl::set(int64_t i) { reset(); type = VAR_INT64; value.i64 = i; } +void VariantImpl::set(float f) { reset(); type = VAR_FLOAT; value.f = f; } +void VariantImpl::set(double d) { reset(); type = VAR_DOUBLE; value.d = d; } +void VariantImpl::set(const std::string& s, const std::string& e) { reset(); type = VAR_STRING; encoding = e; value.string = new std::string(s); } + +void VariantImpl::set(const Variant::Map& m) { + reset(); + type = VAR_MAP; + value.map = new Variant::Map(m); +} + +void VariantImpl::set(const Variant::List& l) { reset(); type = VAR_LIST; value.list = new Variant::List(l); } + +void VariantImpl::set(const Uuid& u) { reset(); type = VAR_UUID; value.uuid = new Uuid(u); } -VariantImpl::VariantImpl() : type(VAR_VOID) { value.i64 = 0; } -VariantImpl::VariantImpl(bool b) : type(VAR_BOOL) { value.b = b; } -VariantImpl::VariantImpl(uint8_t i) : type(VAR_UINT8) { value.ui8 = i; } -VariantImpl::VariantImpl(uint16_t i) : type(VAR_UINT16) { value.ui16 = i; } -VariantImpl::VariantImpl(uint32_t i) : type(VAR_UINT32) { value.ui32 = i; } -VariantImpl::VariantImpl(uint64_t i) : type(VAR_UINT64) { value.ui64 = i; } -VariantImpl::VariantImpl(int8_t i) : type(VAR_INT8) { value.i8 = i; } -VariantImpl::VariantImpl(int16_t i) : type(VAR_INT16) { value.i16 = i; } -VariantImpl::VariantImpl(int32_t i) : type(VAR_INT32) { value.i32 = i; } -VariantImpl::VariantImpl(int64_t i) : type(VAR_INT64) { value.i64 = i; } -VariantImpl::VariantImpl(float f) : type(VAR_FLOAT) { value.f = f; } -VariantImpl::VariantImpl(double d) : type(VAR_DOUBLE) { value.d = d; } -VariantImpl::VariantImpl(const std::string& s, const std::string& e) - : type(VAR_STRING), encoding(e) { value.string = new std::string(s); } -VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.map = new Variant::Map(m); } -VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.list = new Variant::List(l); } -VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.uuid = new Uuid(u); } +VariantImpl::~VariantImpl() { reset(); } -VariantImpl::~VariantImpl() { +void VariantImpl::reset() { switch (type) { case VAR_STRING: delete value.string; @@ -172,6 +183,7 @@ VariantImpl::~VariantImpl() { default: break; } + type = VAR_VOID; } VariantType VariantImpl::getType() const { return type; } @@ -637,46 +649,50 @@ bool isIntegerType(VariantType type) } } -VariantImpl* VariantImpl::create(const Variant& v) +void VariantImpl::set(const Variant& v) { switch (v.getType()) { - case VAR_BOOL: return new VariantImpl(v.asBool()); - case VAR_UINT8: return new VariantImpl(v.asUint8()); - case VAR_UINT16: return new VariantImpl(v.asUint16()); - case VAR_UINT32: return new VariantImpl(v.asUint32()); - case VAR_UINT64: return new VariantImpl(v.asUint64()); - case VAR_INT8: return new VariantImpl(v.asInt8()); - case VAR_INT16: return new VariantImpl(v.asInt16()); - case VAR_INT32: return new VariantImpl(v.asInt32()); - case VAR_INT64: return new VariantImpl(v.asInt64()); - case VAR_FLOAT: return new VariantImpl(v.asFloat()); - case VAR_DOUBLE: return new VariantImpl(v.asDouble()); - case VAR_STRING: return new VariantImpl(v.asString(), v.getEncoding()); - case VAR_MAP: return new VariantImpl(v.asMap()); - case VAR_LIST: return new VariantImpl(v.asList()); - case VAR_UUID: return new VariantImpl(v.asUuid()); - default: return new VariantImpl(); - } -} - -Variant::Variant() : impl(new VariantImpl()) {} -Variant::Variant(bool b) : impl(new VariantImpl(b)) {} -Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(uint32_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(uint64_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(int8_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(int16_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(int32_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(int64_t i) : impl(new VariantImpl(i)) {} -Variant::Variant(float f) : impl(new VariantImpl(f)) {} -Variant::Variant(double d) : impl(new VariantImpl(d)) {} -Variant::Variant(const std::string& s) : impl(new VariantImpl(s)) {} -Variant::Variant(const char* s) : impl(new VariantImpl(std::string(s))) {} -Variant::Variant(const Map& m) : impl(new VariantImpl(m)) {} -Variant::Variant(const List& l) : impl(new VariantImpl(l)) {} -Variant::Variant(const Variant& v) : impl(VariantImpl::create(v)) {} -Variant::Variant(const Uuid& u) : impl(new VariantImpl(u)) {} + case VAR_BOOL: set(v.asBool()); break; + case VAR_UINT8: set(v.asUint8()); break; + case VAR_UINT16: set(v.asUint16()); break; + case VAR_UINT32: set(v.asUint32()); break; + case VAR_UINT64: set(v.asUint64()); break; + case VAR_INT8: set(v.asInt8()); break; + case VAR_INT16: set(v.asInt16()); break; + case VAR_INT32: set(v.asInt32()); break; + case VAR_INT64: set(v.asInt64()); break; + case VAR_FLOAT: set(v.asFloat()); break; + case VAR_DOUBLE: set(v.asDouble()); break; + case VAR_STRING: set(v.asString(), v.getEncoding()); break; + case VAR_MAP: set(v.asMap()); break; + case VAR_LIST: set(v.asList()); break; + case VAR_UUID: set(v.asUuid()); break; + default: reset(); + } + encoding = v.getEncoding(); + descriptors = v.getDescriptors(); +} + +Variant::Variant() : impl(0) {} +Variant::Variant(bool b) : impl(new VariantImpl()) { impl->set(b); } +Variant::Variant(uint8_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(uint16_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(uint32_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(uint64_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(int8_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(int16_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(int32_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(int64_t i) : impl(new VariantImpl()) { impl->set(i); } +Variant::Variant(float f) : impl(new VariantImpl()) { impl->set(f); } +Variant::Variant(double d) : impl(new VariantImpl()) { impl->set(d); } +Variant::Variant(const std::string& s) : impl(new VariantImpl()) { impl->set(s); } +Variant::Variant(const std::string& s, const std::string& encoding) : impl(new VariantImpl()) { impl->set(s, encoding); } +Variant::Variant(const char* s) : impl(new VariantImpl()) { impl->set(std::string(s)); } +Variant::Variant(const char* s, const char* encoding) : impl(new VariantImpl()) { impl->set(std::string(s), std::string(encoding)); } +Variant::Variant(const Map& m) : impl(new VariantImpl()) { impl->set(m); } +Variant::Variant(const List& l) : impl(new VariantImpl()) { impl->set(l); } +Variant::Variant(const Variant& v) : impl(new VariantImpl()) { impl->set(v); } +Variant::Variant(const Uuid& u) : impl(new VariantImpl()) { impl->set(u); } Variant::~Variant() { if (impl) delete impl; } @@ -686,116 +702,105 @@ void Variant::reset() impl = 0; } +namespace { +VariantImpl* assure(VariantImpl*& ptr) { + if (!ptr) ptr = new VariantImpl(); + return ptr; +} +} Variant& Variant::operator=(bool b) { - if (impl) delete impl; - impl = new VariantImpl(b); + assure(impl)->set(b); return *this; } Variant& Variant::operator=(uint8_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(uint16_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(uint32_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(uint64_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(int8_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(int16_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(int32_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(int64_t i) { - if (impl) delete impl; - impl = new VariantImpl(i); + assure(impl)->set(i); return *this; } Variant& Variant::operator=(float f) { - if (impl) delete impl; - impl = new VariantImpl(f); + assure(impl)->set(f); return *this; } Variant& Variant::operator=(double d) { - if (impl) delete impl; - impl = new VariantImpl(d); + assure(impl)->set(d); return *this; } Variant& Variant::operator=(const std::string& s) { - if (impl) delete impl; - impl = new VariantImpl(s); + assure(impl)->set(s); return *this; } Variant& Variant::operator=(const char* s) { - if (impl) delete impl; - impl = new VariantImpl(std::string(s)); + assure(impl)->set(std::string(s)); return *this; } Variant& Variant::operator=(const Uuid& u) { - if (impl) delete impl; - impl = new VariantImpl(u); + assure(impl)->set(u); return *this; } Variant& Variant::operator=(const Map& m) { - if (impl) delete impl; - impl = new VariantImpl(m); + assure(impl)->set(m); return *this; } Variant& Variant::operator=(const List& l) { - if (impl) delete impl; - impl = new VariantImpl(l); + assure(impl)->set(l); return *this; } Variant& Variant::operator=(const Variant& v) { - if (impl) delete impl; - impl = VariantImpl::create(v); + assure(impl)->set(v); return *this; } @@ -841,8 +846,7 @@ Variant::List& Variant::asList() { if (! const std::string& Variant::getString() const { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); } std::string& Variant::getString() { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); } void Variant::setEncoding(const std::string& s) { - if (!impl) impl = new VariantImpl(); - impl->setEncoding(s); + assure(impl)->setEncoding(s); } const std::string& Variant::getEncoding() const { return impl ? impl->getEncoding() : EMPTY; } @@ -884,6 +888,12 @@ std::ostream& operator<<(std::ostream& o std::ostream& operator<<(std::ostream& out, const Variant& value) { + // Print the descriptors + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) + out << "@" << *i << " "; + + // Print the value switch (value.getType()) { case VAR_MAP: out << value.asMap(); @@ -910,7 +920,43 @@ bool operator!=(const Variant& a, const bool Variant::isEqualTo(const Variant& other) const { + if (isVoid() && other.isVoid()) return true; + if (isVoid() || other.isVoid()) return false; return impl && impl->isEqualTo(*other.impl); } +bool Variant::isDescribed() const { + return impl && !impl->descriptors.empty(); +} + +Variant::List& Variant::getDescriptors() { + return assure(impl)->descriptors; +} + +const Variant::List& Variant::getDescriptors() const { + return assure(impl)->descriptors; +} + +Variant Variant::getDescriptor() const { + if (getDescriptors().size() > 0) return getDescriptors().front(); + else return Variant(); +} + +void Variant::setDescriptor(const Variant& descriptor) { + getDescriptors().clear(); + getDescriptors().push_back(descriptor); +} + +Variant Variant::described(const Variant& descriptor, const Variant& value) { + Variant described(value); + described.setDescriptor(descriptor); + return described; +} + +Variant Variant::described(const Variant& descriptor, const List& value) { + Variant described(value); + described.setDescriptor(descriptor); + return described; +} + }} // namespace qpid::types Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h Tue Mar 3 14:58:01 2015 @@ -23,11 +23,13 @@ */ namespace qpid { namespace types { + namespace encodings { const std::string BINARY("binary"); const std::string UTF8("utf8"); const std::string ASCII("ascii"); } + }} // namespace qpid::types #endif /*!QPID_TYPES_ENCODINGS_H*/ Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 3 14:58:01 2015 @@ -7,4 +7,4 @@ /qpid/branches/java-network-refactor/qpid/cpp/src/tests:805429-825319 /qpid/branches/qpid-2935/qpid/cpp/src/tests:1061302-1072333 /qpid/branches/qpid-3346/qpid/cpp/src/tests:1144319-1179855 -/qpid/trunk/qpid/cpp/src/tests:1643238-1658732 +/qpid/trunk/qpid/cpp/src/tests:1643238-1663687 Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h Tue Mar 3 14:58:01 2015 @@ -101,11 +101,13 @@ struct BrokerFixture : private boost::n opts.auth=false; // Argument parsing - std::vector<const char*> argv(args.size()); - for (size_t i = 0; i<args.size(); ++i) - argv[i] = args[i].c_str(); - Plugin::addOptions(opts); - opts.parse(argv.size(), &argv[0]); + if (args.size() > 0) { + std::vector<const char*> argv(args.size()); + for (size_t i = 0; i<args.size(); ++i) + argv[i] = args[i].c_str(); + Plugin::addOptions(opts); + opts.parse(argv.size(), &argv[0]); + } broker = Broker::create(opts); // TODO aconway 2007-12-05: At one point BrokerFixture // tests could hang in Connection ctor if the following Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt Tue Mar 3 14:58:01 2015 @@ -360,6 +360,11 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windo # paged queue not yet implemented for windows add_test (paged_queue_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_paged_queue_tests${test_script_suffix}) endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) + +if (BUILD_AMQP) + add_test (interop_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interop_tests.py) +endif (BUILD_AMQP) + add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py) add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py) if (BUILD_AMQP) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp Tue Mar 3 14:58:01 2015 @@ -18,14 +18,16 @@ * under the License. * */ -#include <iostream> -#include "qpid/types/Variant.h" -#include "qpid/amqp_0_10/Codecs.h" #include "unit_test.h" +#include "qpid/types/Variant.h" +#include "qpid/amqp_0_10/Codecs.h" +#include <boost/assign.hpp> +#include <iostream> using namespace qpid::types; using namespace qpid::amqp_0_10; +using boost::assign::list_of; namespace qpid { namespace tests { @@ -807,6 +809,22 @@ QPID_AUTO_TEST_CASE(parse) BOOST_CHECK(a.getType()==types::VAR_DOUBLE); } +QPID_AUTO_TEST_CASE(described) +{ + Variant a; + BOOST_CHECK(!a.isDescribed()); + a.getDescriptors().push_back("foo"); + BOOST_CHECK(a.isDescribed()); + BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")); + a = 42; + BOOST_CHECK(a.isDescribed()); + BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")); + a.getDescriptors().push_back(33); + BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")(33)); + a.getDescriptors().clear(); + BOOST_CHECK(!a.isDescribed()); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py Tue Mar 3 14:58:01 2015 @@ -21,9 +21,9 @@ import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re import qpid, traceback, signal +import proton from qpid import connection, util from qpid.compat import format_exc -from qpid.harness import Skipped from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition @@ -49,13 +49,18 @@ from qpidtoollibs import BrokerAgent import qpid.messaging qm = qpid.messaging qpid_messaging = None + +def env_has_log_config(): + """True if there are qpid log configuratoin settings in the environment.""" + return "QPID_LOG_ENABLE" in os.environ or "QPID_TRACE" in os.environ + if not os.environ.get("QPID_PY_NO_SWIG"): try: import qpid_messaging from qpid.datatypes import uuid4 qm = qpid_messaging # Silence warnings from swigged messaging library unless enabled in environment. - if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ: + if not env_has_log_config(): qm.Logger.configure(["--log-enable=error"]) except ImportError: print "Cannot load python SWIG bindings, falling back to native qpid.messaging." @@ -136,7 +141,7 @@ _popen_id = AtomicCounter() # Popen iden # Constants for file descriptor arguments to Popen FILE = "FILE" # Write to file named after process -PIPE = subprocess.PIPE +from subprocess import PIPE, STDOUT class Popen(subprocess.Popen): """ @@ -202,7 +207,7 @@ class Popen(subprocess.Popen): def communicate(self, input=None): ret = subprocess.Popen.communicate(self, input) - self.cleanup() + self._cleanup() return ret def is_running(self): return self.poll() is None @@ -254,6 +259,7 @@ class Popen(subprocess.Popen): def cmd_str(self): return " ".join([str(s) for s in self.cmd]) + def checkenv(name): value = os.getenv(name) if not value: raise Exception("Environment variable %s is not set" % name) @@ -308,7 +314,7 @@ class Broker(Popen): cmd += ["--log-to-stderr=no"] # Add default --log-enable arguments unless args already has --log arguments. - if not [l for l in args if l.startswith("--log")]: + if not env_has_log_config() and not [l for l in args if l.startswith("--log")]: args += ["--log-enable=info+"] if test_store: cmd += ["--load-module", BrokerTest.test_store_lib, @@ -444,10 +450,11 @@ def browse(session, queue, timeout=0, tr finally: r.close() -def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"): +def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" + if msg is None: msg = "browse '%s' failed" % queue actual_contents = browse(session, queue, timeout, transform=transform) if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) assert expect_contents == actual_contents, msg @@ -486,6 +493,18 @@ class BrokerTest(TestCase): test_store_lib = os.getenv("TEST_STORE_LIB") rootdir = os.getcwd() + PN_VERSION = (proton.VERSION_MAJOR, proton.VERSION_MINOR) + PN_TX_VERSION = (0, 9) + + amqp_tx_supported = PN_VERSION >= PN_TX_VERSION + + @classmethod + def amqp_tx_warning(cls): + if not cls.amqp_tx_supported: + print "WARNING: Cannot test transactions over AMQP 1.0, proton version %s.%s < %s.%s" % (cls.PN_VERSION + cls.PN_TX_VERSION) + return False + return True + def configure(self, config): self.config=config def setUp(self): @@ -498,8 +517,8 @@ class BrokerTest(TestCase): if qpid_messaging and self.amqp_lib: default_protocol="amqp1.0" else: default_protocol="amqp0-10" self.protocol = defs.get("PROTOCOL") or default_protocol - self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0 - + self.tx_protocol = self.protocol + if not self.amqp_tx_supported: self.tx_protocol = "amqp0-10" def tearDown(self): err = [] @@ -530,15 +549,22 @@ class BrokerTest(TestCase): self.teardown_add(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False, **kw): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd) + b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd, **kw) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b + def check_output(self, args, stdin=None): + p = self.popen(args, stdout=PIPE, stderr=STDOUT) + out = p.communicate(stdin) + if p.returncode != 0: + raise Exception("%s exit code %s, output:\n%s" % (args, p.returncode, out[0])) + return out[0] + def browse(self, *args, **kwargs): browse(*args, **kwargs) def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py Tue Mar 3 14:58:01 2015 @@ -24,6 +24,7 @@ from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent +from qpid.harness import Skipped log = getLogger(__name__) @@ -129,12 +130,9 @@ class HaBroker(Broker): args += ["--load-module", BrokerTest.ha_lib, # Non-standard settings for faster tests. "--link-maintenance-interval=0.1", - # Heartbeat and negotiate time are needed so that a broker wont - # stall on an address that doesn't currently have a broker running. - "--max-negotiate-time=1000", "--ha-cluster=%s"%ha_cluster] # Add default --log-enable arguments unless args already has --log arguments. - if not [l for l in args if l.startswith("--log")]: + if not env_has_log_config() and not [l for l in args if l.startswith("--log")]: args += ["--log-enable=info+", "--log-enable=debug+:ha::"] if not [h for h in args if h.startswith("--link-heartbeat-interval")]: args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)] @@ -159,13 +157,20 @@ acl allow all all Broker.__init__(self, test, args, port=ha_port.port, **kwargs) # Do some static setup to locate the qpid-config and qpid-ha tools. - qpid_ha_script=import_script(os.path.join(os.getenv("PYTHON_COMMANDS"),"qpid-ha")) - qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") - assert os.path.isfile(qpid_config_path) + @property + def qpid_ha_script(self): + if not hasattr(self, "_qpid_ha_script"): + qpid_ha_exec = os.getenv("QPID_HA_EXEC") + if not qpid_ha_exec or not os.path.isfile(qpid_ha_exec): + raise Skipped("qpid-ha not available") + self._qpid_ha_script = import_script(qpid_ha_exec) + return self._qpid_ha_script def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port()) def qpid_ha(self, args): + if not self.qpid_ha_script: + raise Skipped("qpid-ha not available") try: cred = self.client_credentials url = self.host_port() @@ -195,33 +200,37 @@ acl allow all all def ha_status(self): return self.qmf().status - def wait_status(self, status, timeout=5): + def wait_status(self, status, timeout=10): + def try_get_status(): self._status = "<unknown>" - # Ignore ConnectionError, the broker may not be up yet. try: self._status = self.ha_status() - return self._status == status; - except qm.ConnectionError: return False + except qm.ConnectionError, e: + # Record the error but don't raise, the broker may not be up yet. + self._status = "%s: %s" % (type(e).__name__, e) + return self._status == status; assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%( self, status, self._status) - def wait_queue(self, queue, timeout=1, msg="wait_queue"): + def wait_queue(self, queue, timeout=10, msg="wait_queue"): """ Wait for queue to be visible via QMF""" agent = self.agent - assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue + assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), \ + "%s queue %s not present" % (msg, queue) - def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"): + def wait_no_queue(self, queue, timeout=10, msg="wait_no_queue"): """ Wait for queue to be invisible via QMF""" agent = self.agent assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue) - # TODO aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): + qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") + if not qpid_config_exec or not os.path.isfile(qpid_config_exec): + raise Skipped("qpid-config not available") assert subprocess.call( - [self.qpid_config_path, "--broker", self.host_port()]+args, - stdout=1, stderr=subprocess.STDOUT - ) == 0 + [qpid_config_exec, "--broker", self.host_port()]+args, stdout=1, stderr=subprocess.STDOUT + ) == 0, "qpid-config failed" def config_replicate(self, from_broker, queue): self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) @@ -325,7 +334,7 @@ class HaCluster(object): ha_port = self._ports[i] b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name, args=args, **self.kwargs) - b.ready(timeout=5) + b.ready(timeout=10) return b def start(self): Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py Tue Mar 3 14:58:01 2015 @@ -1025,8 +1025,8 @@ class LongTests(HaBrokerTest): "--broker", brokers[0].host_port(), "--address", "q;{create:always}", "--messages=1000", - "--tx=10" - # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet + "--tx=10", + "--connection-options={protocol:%s}" % self.tx_protocol ]) receiver = self.popen( ["qpid-receive", @@ -1034,8 +1034,8 @@ class LongTests(HaBrokerTest): "--address", "q;{create:always}", "--messages=990", "--timeout=10", - "--tx=10" - # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet + "--tx=10", + "--connection-options={protocol:%s}" % self.tx_protocol ]) self.assertEqual(sender.wait(), 0) self.assertEqual(receiver.wait(), 0) @@ -1268,7 +1268,7 @@ class StoreTests(HaBrokerTest): """Verify that a backup erases queue data from store recovery before doing catch-up from the primary.""" if self.check_skip(): return - cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store']) + cluster = HaCluster(self, 2) sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session() s1 = sn.sender("q1;{create:always,node:{durable:true}}") for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True)) @@ -1532,7 +1532,7 @@ class TransactionTests(HaBrokerTest): except qm.TransactionUnknown: pass for b in cluster: self.assert_tx_clean(b) try: tx.connection.close() - except TransactionUnknown: pass # Occasionally get exception on close. + except qm.TransactionUnknown: pass # Occasionally get exception on close. finally: l.restore() def test_tx_no_backups(self): @@ -1622,21 +1622,26 @@ class TransactionTests(HaBrokerTest): import qpid_tests.broker_0_10 except ImportError: raise Skipped("Tests not found") - cluster = HaCluster(self, 3) - self.popen(["qpid-txtest", "-p%s"%cluster[0].port()]).assert_exit_ok() + if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"] + self.popen(["qpid-txtest2", "--broker", cluster[0].host_port()]).assert_exit_ok() + print self.popen(["qpid-python-test", "-m", "qpid_tests.broker_0_10", + "-m", "qpid_tests.broker_1_0", "-b", "localhost:%s"%(cluster[0].port()), - "*.tx.*"]).assert_exit_ok() + "*.tx.*"], stdout=None, stderr=None).assert_exit_ok() if __name__ == "__main__": - outdir = "ha_tests.tmp" - shutil.rmtree(outdir, True) - qpid_ha = os.getenv("QPID_HA_EXEC") - if qpid_ha and os.path.exists(qpid_ha): + qpid_ha_exec = os.getenv("QPID_HA_EXEC") + if qpid_ha_exec and os.path.isfile(qpid_ha_exec): + BrokerTest.amqp_tx_warning() + outdir = "ha_tests.tmp" + shutil.rmtree(outdir, True) os.execvp("qpid-python-test", - ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir] + ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir] + sys.argv[1:]) else: - print "Skipping ha_tests, %s not available"%(qpid_ha) + print "Skipping ha_tests, qpid-ha not available" + + Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py Tue Mar 3 14:58:01 2015 @@ -88,6 +88,7 @@ class AmqpBrokerTest(BrokerTest): result = self.popen(cmd, stdout=PIPE) r.fetch(timeout=1) # wait until receiver is actually ready s.acknowledge() + r.close() s.close() return result Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp Tue Mar 3 14:58:01 2015 @@ -197,7 +197,7 @@ int main(int argc, char ** argv) std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); Receiver receiver = session.createReceiver(opts.address); - receiver.setCapacity(opts.capacity); + receiver.setCapacity(std::min(opts.capacity, opts.messages)); Message msg; uint count = 0; uint txCount = 0; @@ -207,9 +207,9 @@ int main(int argc, char ** argv) Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader); if (!opts.readyAddress.empty()) { session.createSender(opts.readyAddress).send(msg); - if (opts.tx) - session.commit(); - } + if (opts.tx) + session.commit(); + } // For receive rate calculation qpid::sys::AbsTime start = qpid::sys::now(); int64_t interval = 0; @@ -290,6 +290,7 @@ int main(int argc, char ** argv) connection.close(); return 0; } + return 1; } catch(const std::exception& error) { std::cerr << "qpid-receive: " << error.what() << std::endl; connection.close(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp Tue Mar 3 14:58:01 2015 @@ -112,14 +112,14 @@ struct Options : public qpid::Options log(argv0), reportTotal(false), reportEvery(0), - reportHeader(true), - sendRate(0), - sequence(true), - timestamp(true), - groupPrefix("GROUP-"), - groupSize(10), - groupRandSize(false), - groupInterleave(1) + reportHeader(true), + sendRate(0), + sequence(true), + timestamp(true), + groupPrefix("GROUP-"), + groupSize(10), + groupRandSize(false), + groupInterleave(1) { addOptions() ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") @@ -272,7 +272,7 @@ class MapContentGenerator : public Con // tag each generated message with a group identifer // class GroupGenerator { -public: + public: GroupGenerator(const std::string& key, const std::string& prefix, const uint size, @@ -351,7 +351,7 @@ int main(int argc, char ** argv) try { Options opts; if (opts.parse(argc, argv)) { - connection = Connection(opts.url, opts.connectionOptions); + connection = Connection(opts.url, opts.connectionOptions); connection.open(); std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0); Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession(); @@ -447,6 +447,7 @@ int main(int argc, char ** argv) connection.close(); return 0; } + return 1; } catch(const std::exception& error) { std::cerr << "qpid-send: " << error.what() << std::endl; connection.close(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp Tue Mar 3 14:58:01 2015 @@ -353,10 +353,11 @@ int main(int argc, char** argv) if (opts.init) controller.init(); if (opts.transfer) controller.transfer(); if (opts.check) return controller.check(); + return 0; } - return 0; + return 1; } catch(const std::exception& e) { - std::cout << argv[0] << ": " << e.what() << std::endl; + std::cerr << argv[0] << ": " << e.what() << std::endl; } return 2; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests Tue Mar 3 14:58:01 2015 @@ -39,7 +39,8 @@ skip() { } start_broker() { - QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker" + rm -f swig_python_tests.log + QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no --log-to-file swig_python_tests.log) || fail "Could not start broker" } stop_broker() { @@ -54,9 +55,9 @@ echo "Running swigged python tests using export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG export QPID_USE_SWIG_CLIENT=1 -$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1 +$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests $* || FAILED=1 if [[ -a $AMQP_LIB ]] ; then - $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1 + $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests $* || FAILED=1 fi stop_broker if [[ $FAILED -eq 1 ]]; then Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in Tue Mar 3 14:58:01 2015 @@ -20,14 +20,14 @@ absdir() { echo `cd $1 && pwd`; } # Environment variables substituted by cmake. -srcdir=`absdir @abs_srcdir@` -builddir=`absdir @abs_builddir@` -top_srcdir=`absdir @abs_top_srcdir@` -top_builddir=`absdir @abs_top_builddir@` -moduledir=$top_builddir/src@builddir_lib_suffix@ -pythonswigdir=$top_builddir/bindings/qpid/python/ -pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@ -testmoduledir=$builddir@builddir_lib_suffix@ +export srcdir=`absdir @abs_srcdir@` +export builddir=`absdir @abs_builddir@` +export top_srcdir=`absdir @abs_top_srcdir@` +export top_builddir=`absdir @abs_top_builddir@` +export moduledir=$top_builddir/src@builddir_lib_suffix@ +export pythonswigdir=$top_builddir/bindings/qpid/python/ +export pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@ +export testmoduledir=$builddir@builddir_lib_suffix@ export QPID_INSTALL_PREFIX=@prefix@ # Tools substituted by cmake Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp Tue Mar 3 14:58:01 2015 @@ -223,27 +223,18 @@ class TestStore : public NullMessageStor const boost::intrusive_ptr<PersistableMessage>& pmsg, const PersistableQueue& queue) { - qpid::broker::amqp_0_10::MessageTransfer* msg = - dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get()); - assert(msg); - ostringstream o; - o << "<enqueue " << queue.getName() << " " << getContent(msg); + string data = getContent(pmsg); + o << "<enqueue " << queue.getName() << " " << data; if (tx) o << " tx=" << getId(*tx); o << ">"; log(o.str()); // Dump the message if there is a dump file. if (dump.get()) { - msg->getFrames().getMethod()->print(*dump); - *dump << endl << " "; - msg->getFrames().getHeaders()->print(*dump); - *dump << endl << " "; - *dump << msg->getFrames().getContentSize() << endl; + *dump << "Message(" << data.size() << "): " << data << endl; } string logPrefix = "TestStore "+name+": "; - // Check the message for special instructions for this store. - string data = msg->getFrames().getContent(); Action action(data); bool doComplete = true; if (action.index && action.executeIn(name)) { @@ -258,7 +249,7 @@ class TestStore : public NullMessageStor QPID_LOG(error, logPrefix << "async-id needs argument: " << data); break; } - asyncIds[action.args[0]] = msg; + asyncIds[action.args[0]] = pmsg; QPID_LOG(debug, logPrefix << "delayed completion " << action.args[0]); doComplete = false; break; @@ -284,7 +275,7 @@ class TestStore : public NullMessageStor QPID_LOG(error, logPrefix << "unknown action: " << data); } } - if (doComplete) msg->enqueueComplete(); + if (doComplete) pmsg->enqueueComplete(); } void dequeue(TransactionContext* tx, Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 3 14:58:01 2015 @@ -3,4 +3,4 @@ /qpid/branches/java-network-refactor/qpid/python:805429-825319 /qpid/branches/qmfv2/qpid/python:902858,902894 /qpid/branches/qpid-2935/qpid/python:1061302-1072333 -/qpid/trunk/qpid/python:1643238-1659605 +/qpid/trunk/qpid/python:1643238-1663687 Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py Tue Mar 3 14:58:01 2015 @@ -89,7 +89,7 @@ class Client: self.password = password self.locale = locale self.tune_params = tune_params - self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties) + self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties, version_property_key="version") self.sasl_options = sasl_options self.socket = connect(self.host, self.port, connection_options) self.conn = Connection(self.socket, self.spec) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py Tue Mar 3 14:58:01 2015 @@ -21,26 +21,32 @@ from qpid.util import get_client_propert class UtilTest (TestCase): - def test_get_spec_recommended_client_properties(self): - client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"}) + def test_default_client_properties_08091(self): + client_properties = get_client_properties_with_defaults(version_property_key="version") self.assertTrue("product" in client_properties) self.assertTrue("version" in client_properties) self.assertTrue("platform" in client_properties) - def test_get_client_properties_with_provided_value(self): + def test_default_client_properties_010(self): + client_properties = get_client_properties_with_defaults(version_property_key="qpid.client_version") + self.assertTrue("product" in client_properties) + self.assertTrue("qpid.client_version" in client_properties) + self.assertTrue("platform" in client_properties) + + def test_client_properties_with_provided_value(self): client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"}) self.assertTrue("product" in client_properties) self.assertTrue("mykey" in client_properties) self.assertEqual("myvalue", client_properties["mykey"]) - def test_get_client_properties_with_no_provided_values(self): + def test_client_properties_with_provided_value_that_overrides_default(self): + client_properties = get_client_properties_with_defaults(provided_client_properties={"product":"myproduct"}) + self.assertEqual("myproduct", client_properties["product"]) + + def test_client_properties_with_no_provided_values(self): client_properties = get_client_properties_with_defaults(provided_client_properties=None) self.assertTrue("product" in client_properties) client_properties = get_client_properties_with_defaults() self.assertTrue("product" in client_properties) - def test_get_client_properties_with_provided_value_that_overrides_default(self): - client_properties = get_client_properties_with_defaults(provided_client_properties={"version":"myversion"}) - self.assertEqual("myversion", client_properties["version"]) - Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py Tue Mar 3 14:58:01 2015 @@ -42,15 +42,24 @@ except ImportError: def close(self): self.sock.close() -def get_client_properties_with_defaults(provided_client_properties={}): +def get_client_properties_with_defaults(provided_client_properties={}, version_property_key="qpid.client_version"): ppid = 0 + version = "unidentified" try: ppid = os.getppid() except: pass + try: + import pkg_resources + pkg = pkg_resources.require("qpid-python") + if pkg and pkg[0] and pkg[0].version: + version = pkg[0].version + except: + pass + client_properties = {"product": "qpid python client", - "version": "development", + version_property_key : version, "platform": os.name, "qpid.client_process": os.path.basename(sys.argv and sys.argv[0] or ''), "qpid.client_pid": os.getpid(), Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py Tue Mar 3 14:58:01 2015 @@ -23,3 +23,4 @@ from general import * from legacy_exchanges import * from selector import * from translation import * +from tx import * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org