Author: kwall Date: Tue Mar 3 14:58:01 2015 New Revision: 1663719 URL: http://svn.apache.org/r1663719 Log: merge from trunk
Added: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeModules/CheckSizetDistinct.cmake - copied unchanged from r1663687, qpid/trunk/qpid/cpp/CMakeModules/CheckSizetDistinct.cmake qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp - copied unchanged from r1663687, qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/Transaction.h - copied unchanged from r1663687, qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transaction.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/aix/ - copied from r1663687, qpid/trunk/qpid/cpp/src/qpid/sys/aix/ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interop_tests.py - copied unchanged from r1663687, qpid/trunk/qpid/cpp/src/tests/interop_tests.py qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py - copied unchanged from r1663687, qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py Removed: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake Modified: qpid/branches/QPID-6262-JavaBrokerNIO/ (props changed) qpid/branches/QPID-6262-JavaBrokerNIO/qpid/ (props changed) qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/ (props changed) qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt (contents, props changed) qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/ (props changed) qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ (props changed) qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/ (props changed) qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 3 14:58:01 2015 @@ -3,4 +3,4 @@ /qpid/branches/java-broker-bdb-ha2:1576683-1583556 /qpid/branches/java-network-refactor:805429-825319 /qpid/branches/mcpierce-QPID-4719:1477004-1477093 -/qpid/trunk:1643238-1659605 +/qpid/trunk:1643238-1663687 Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 3 14:58:01 2015 @@ -6,4 +6,4 @@ /qpid/branches/mcpierce-QPID-4719/qpid:1477004-1477093 /qpid/branches/qpid-2935/qpid:1061302-1072333 /qpid/branches/qpid-3346/qpid:1144319-1179855 -/qpid/trunk/qpid:1643238-1659605 +/qpid/trunk/qpid:1643238-1663687 Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt Tue Mar 3 14:58:01 2015 @@ -23,7 +23,6 @@ set (CMAKE_BUILD_TYPE RelWithDebInfo CAC if (CMAKE_BUILD_TYPE MATCHES "Deb") set (has_debug_symbols " (has debug symbols)") endif (CMAKE_BUILD_TYPE MATCHES "Deb") -message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}") project(qpid-cpp) @@ -242,5 +241,5 @@ add_subdirectory(examples) include (CPack) # Build type message again, last so it is visible at end of output. -message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}") +message(STATUS "Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}") Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h Tue Mar 3 14:58:01 2015 @@ -89,7 +89,9 @@ class QPID_TYPES_CLASS_EXTERN Variant QPID_TYPES_EXTERN Variant(float); QPID_TYPES_EXTERN Variant(double); QPID_TYPES_EXTERN Variant(const std::string&); + QPID_TYPES_EXTERN Variant(const std::string& value, const std::string& encoding); QPID_TYPES_EXTERN Variant(const char*); + QPID_TYPES_EXTERN Variant(const char* value, const char* encoding); QPID_TYPES_EXTERN Variant(const Map&); QPID_TYPES_EXTERN Variant(const List&); QPID_TYPES_EXTERN Variant(const Variant&); @@ -156,9 +158,10 @@ class QPID_TYPES_CLASS_EXTERN Variant QPID_TYPES_EXTERN Map& asMap(); QPID_TYPES_EXTERN const List& asList() const; QPID_TYPES_EXTERN List& asList(); + /** - * Unlike asString(), getString() will not do any conversions and - * will throw InvalidConversion if the type is not STRING. + * Unlike asString(), getString() will not do any conversions. + * @exception InvalidConversion if the type is not STRING. */ QPID_TYPES_EXTERN const std::string& getString() const; QPID_TYPES_EXTERN std::string& getString(); @@ -168,9 +171,45 @@ class QPID_TYPES_CLASS_EXTERN Variant QPID_TYPES_EXTERN bool isEqualTo(const Variant& a) const; + /** Reset value to VOID, does not reset the descriptors. */ QPID_TYPES_EXTERN void reset(); + + /** True if there is at least one descriptor associated with this variant. */ + QPID_TYPES_EXTERN bool isDescribed() const; + + /** Get the first descriptor associated with this variant. + * + * Normally there is at most one descriptor, when there are multiple + * descriptors use getDescriptors() + * + *@return The first descriptor or VOID if there is no descriptor. + *@see isDescribed, getDescriptors + */ + QPID_TYPES_EXTERN Variant getDescriptor() const; + + /** Set a single descriptor for this Variant. The descriptor must be a string or integer. */ + QPID_TYPES_EXTERN void setDescriptor(const Variant& descriptor); + + /** Return a modifiable list of descriptors for this Variant. + * Used in case where there are multiple descriptors, for a single descriptor use + * getDescriptor and setDescriptor. + */ + QPID_TYPES_EXTERN List& getDescriptors(); + + /** Return the list of descriptors for this Variant. + * Used in case where there are multiple descriptors, for a single descriptor use + * getDescriptor and setDescriptor. + */ + QPID_TYPES_EXTERN const List& getDescriptors() const; + + /** Create a described value */ + QPID_TYPES_EXTERN static Variant described(const Variant& descriptor, const Variant& value); + + /** Create a described list, a common special case */ + QPID_TYPES_EXTERN static Variant described(const Variant& descriptor, const List& value); + private: - VariantImpl* impl; + mutable VariantImpl* impl; }; #ifndef SWIG Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 3 14:58:01 2015 @@ -6,4 +6,4 @@ /qpid/branches/java-network-refactor/qpid/cpp/src:805429-825319 /qpid/branches/qpid-2935/qpid/cpp/src:1061302-1072333 /qpid/branches/qpid-3346/qpid/cpp/src:1144319-1179855 -/qpid/trunk/qpid/cpp/src:1643238-1659605 +/qpid/trunk/qpid/cpp/src:1643238-1663687 Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt Tue Mar 3 14:58:01 2015 @@ -42,7 +42,7 @@ include(CheckIncludeFiles) include(CheckIncludeFileCXX) include(CheckLibraryExists) include(CheckSymbolExists) -include(CheckSizeTNativeType) +include(CheckSizetDistinct) find_package(PkgConfig) find_package(Ruby) @@ -351,7 +351,7 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windo mark_as_advanced(QPID_POLLER) endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) -check_size_t_native_type (QPID_SIZE_T_NATIVE) +check_size_t_distinct (QPID_SIZE_T_DISTINCT) option(BUILD_SASL "Build with Cyrus SASL support" ${SASL_FOUND}) if (BUILD_SASL) Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 3 14:58:01 2015 @@ -7,4 +7,4 @@ /qpid/branches/qpid-2393/qpid/cpp/src/CMakeLists.txt:1375790-1376954 /qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt:1061302-1072333 /qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt:1144319-1179855 -/qpid/trunk/qpid/cpp/src/CMakeLists.txt:1643238-1658732 +/qpid/trunk/qpid/cpp/src/CMakeLists.txt:1643238-1663687 Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake Tue Mar 3 14:58:01 2015 @@ -144,6 +144,8 @@ if (BUILD_AMQP) qpid/messaging/amqp/SessionHandle.cpp qpid/messaging/amqp/TcpTransport.h qpid/messaging/amqp/TcpTransport.cpp + qpid/messaging/amqp/Transaction.h + qpid/messaging/amqp/Transaction.cpp ) if (WIN32) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake Tue Mar 3 14:58:01 2015 @@ -56,7 +56,7 @@ #cmakedefine HAVE_SYS_SDT_H ${HAVE_SYS_SDT_H} #cmakedefine HAVE_LOG_AUTHPRIV #cmakedefine HAVE_LOG_FTP -#cmakedefine QPID_SIZE_T_NATIVE +#cmakedefine QPID_SIZE_T_DISTINCT #cmakedefine HAVE_PROTON_TRACER #cmakedefine USE_PROTON_TRANSPORT_CONDITION #cmakedefine HAVE_PROTON_EVENTS Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake Tue Mar 3 14:58:01 2015 @@ -39,8 +39,8 @@ else (DEFINED legacystore_force) # # allow legacystore to be built # - message(STATUS "BerkeleyDB for C++ and libaio found, Legacystore support enabled") - set (legacystore_default ON) + message(STATUS "BerkeleyDB for C++ and libaio found, Legacystore support disabled by default (deprecated, use linearstore instead).") + set (legacystore_default OFF) # Disabled, deprecated. Use linearstore instead. else (HAVE_AIO AND HAVE_AIO_H) if (NOT HAVE_AIO) message(STATUS "Legacystore requires libaio which is absent.") Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake Tue Mar 3 14:58:01 2015 @@ -39,8 +39,8 @@ else (DEFINED linearstore_force) # # allow linearstore to be built # - message(STATUS "BerkeleyDB for C++ and libaio found, Linearstore support may be enabled (currently experimental and disabled by default)") - set (linearstore_default OFF) # Temporarily disabled + message(STATUS "BerkeleyDB for C++ and libaio found, Linearstore support enabled.") + set (linearstore_default ON) else (HAVE_AIO AND HAVE_AIO_H) if (NOT HAVE_AIO) message(STATUS "Linearstore requires libaio which is absent.") Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp Tue Mar 3 14:58:01 2015 @@ -146,7 +146,7 @@ template QPID_COMMON_EXTERN po::value_se template QPID_COMMON_EXTERN po::value_semantic* create_value(uint16_t& val, const std::string& arg); template QPID_COMMON_EXTERN po::value_semantic* create_value(uint32_t& val, const std::string& arg); template QPID_COMMON_EXTERN po::value_semantic* create_value(uint64_t& val, const std::string& arg); -#ifdef QPID_SIZE_T_NATIVE +#ifdef QPID_SIZE_T_DISTINCT template QPID_COMMON_EXTERN po::value_semantic* create_value(size_t& val, const std::string& arg); #endif template QPID_COMMON_EXTERN po::value_semantic* create_value(double& val, const std::string& arg); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp Tue Mar 3 14:58:01 2015 @@ -113,8 +113,10 @@ class UrlParser { const char* at = std::find(i, end, '@'); if (at == end) return false; const char* slash = std::find(i, at, '/'); - url.setUser(string(i, slash)); - const char* pass = (slash == at) ? slash : slash+1; + const char* colon = std::find(i, at, ':'); + const char* sep = std::min(slash, colon); + url.setUser(string(i, sep)); + const char* pass = (sep == at) ? sep : sep+1; url.setPass(string(pass, at)); i = at+1; return true; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp Tue Mar 3 14:58:01 2015 @@ -35,7 +35,7 @@ CharSequence::operator bool() const } std::string CharSequence::str() const { - return std::string(data, size); + return (data && size) ? std::string(data, size) : std::string(); } CharSequence CharSequence::create() Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp Tue Mar 3 14:58:01 2015 @@ -19,11 +19,17 @@ * */ #include "Descriptor.h" +#include "descriptors.h" +#include <qpid/framing/reply_exceptions.h> +#include <map> namespace qpid { namespace amqp { + Descriptor::Descriptor(uint64_t code) : type(NUMERIC) { value.code = code; } + Descriptor::Descriptor(const CharSequence& symbol) : type(SYMBOLIC) { value.symbol = symbol; } + bool Descriptor::match(const std::string& symbol, uint64_t code) const { switch (type) { @@ -58,20 +64,85 @@ Descriptor* Descriptor::nest(const Descr return nested.get(); } -std::ostream& operator<<(std::ostream& os, const Descriptor& d) -{ - switch (d.type) { - case Descriptor::SYMBOLIC: - if (d.value.symbol.data && d.value.symbol.size) os << std::string(d.value.symbol.data, d.value.symbol.size); - else os << "null"; - break; - case Descriptor::NUMERIC: - os << "0x" << std::hex << d.value.code; - break; +namespace { + +class DescriptorMap { + typedef std::map<uint64_t, std::string> SymbolMap; + typedef std::map<std::string, uint64_t> CodeMap; + + SymbolMap symbols; + CodeMap codes; + + public: + DescriptorMap() { + symbols[message::HEADER_CODE] = message::HEADER_SYMBOL; + symbols[message::DELIVERY_ANNOTATIONS_CODE] = message::DELIVERY_ANNOTATIONS_SYMBOL; + symbols[message::MESSAGE_ANNOTATIONS_CODE] = message::MESSAGE_ANNOTATIONS_SYMBOL; + symbols[message::PROPERTIES_CODE] = message::PROPERTIES_SYMBOL; + symbols[message::APPLICATION_PROPERTIES_CODE] = message::APPLICATION_PROPERTIES_SYMBOL; + symbols[message::DATA_CODE] = message::DATA_SYMBOL; + symbols[message::AMQP_SEQUENCE_CODE] = message::AMQP_SEQUENCE_SYMBOL; + symbols[message::AMQP_VALUE_CODE] = message::AMQP_VALUE_SYMBOL; + symbols[message::FOOTER_CODE] = message::FOOTER_SYMBOL; + symbols[message::ACCEPTED_CODE] = message::ACCEPTED_SYMBOL; + symbols[sasl::SASL_MECHANISMS_CODE] = sasl::SASL_MECHANISMS_SYMBOL; + symbols[sasl::SASL_INIT_CODE] = sasl::SASL_INIT_SYMBOL; + symbols[sasl::SASL_CHALLENGE_CODE] = sasl::SASL_CHALLENGE_SYMBOL; + symbols[sasl::SASL_RESPONSE_CODE] = sasl::SASL_RESPONSE_SYMBOL; + symbols[sasl::SASL_OUTCOME_CODE] = sasl::SASL_OUTCOME_SYMBOL; + symbols[filters::LEGACY_DIRECT_FILTER_CODE] = filters::LEGACY_DIRECT_FILTER_SYMBOL; + symbols[filters::LEGACY_TOPIC_FILTER_CODE] = filters::LEGACY_TOPIC_FILTER_SYMBOL; + symbols[filters::LEGACY_HEADERS_FILTER_CODE] = filters::LEGACY_HEADERS_FILTER_SYMBOL; + symbols[filters::SELECTOR_FILTER_CODE] = filters::SELECTOR_FILTER_SYMBOL; + symbols[filters::XQUERY_FILTER_CODE] = filters::XQUERY_FILTER_SYMBOL; + symbols[lifetime_policy::DELETE_ON_CLOSE_CODE] = lifetime_policy::DELETE_ON_CLOSE_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_LINKS_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL; + symbols[lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE] = lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL; + symbols[transaction::DECLARE_CODE] = transaction::DECLARE_SYMBOL; + symbols[transaction::DISCHARGE_CODE] = transaction::DISCHARGE_SYMBOL; + symbols[transaction::DECLARED_CODE] = transaction::DECLARED_SYMBOL; + symbols[transaction::TRANSACTIONAL_STATE_CODE] = transaction::TRANSACTIONAL_STATE_SYMBOL; + symbols[0] = "unknown-descriptor"; + + for (SymbolMap::const_iterator i = symbols.begin(); i != symbols.end(); ++i) + codes[i->second] = i->first; + } + + std::string operator[](uint64_t code) const { + SymbolMap::const_iterator i = symbols.find(code); + return (i == symbols.end()) ? "unknown-descriptor" : i->second; } - if (d.nested.get()) { - os << " ->(" << *d.nested << ")"; + + uint64_t operator[](const std::string& symbol) const { + CodeMap::const_iterator i = codes.find(symbol); + return (i == codes.end()) ? 0 : i->second; + } +}; + +DescriptorMap DESCRIPTOR_MAP; +} + +std::string Descriptor::symbol() const { + switch (type) { + case Descriptor::NUMERIC: return DESCRIPTOR_MAP[value.code]; + case Descriptor::SYMBOLIC: return value.symbol.str(); + } + assert(0); + return std::string(); +} + +uint64_t Descriptor::code() const { + switch (type) { + case Descriptor::NUMERIC: return value.code; + case Descriptor::SYMBOLIC: return DESCRIPTOR_MAP[value.symbol.str()]; } - return os; + assert(0); + return 0; } + +std::ostream& operator<<(std::ostream& os, const Descriptor& d) { + return os << d.symbol() << "(" << "0x" << std::hex << d.code() << ")"; +} + }} // namespace qpid::amqp Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h Tue Mar 3 14:58:01 2015 @@ -49,6 +49,8 @@ struct Descriptor QPID_COMMON_EXTERN bool match(const std::string&, uint64_t) const; QPID_COMMON_EXTERN size_t getSize() const; QPID_COMMON_EXTERN Descriptor* nest(const Descriptor& d); + QPID_COMMON_EXTERN std::string symbol() const; + QPID_COMMON_EXTERN uint64_t code() const; }; QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& os, const Descriptor& d); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp Tue Mar 3 14:58:01 2015 @@ -31,10 +31,17 @@ #include <string.h> using namespace qpid::types::encodings; +using qpid::types::Variant; namespace qpid { namespace amqp { +Encoder::Overflow::Overflow() : Exception("Buffer overflow in encoder!") {} + +Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0), grow(false) {} + +Encoder::Encoder() : data(0), size(0), position(0), grow(true) {} + namespace { template <typename T> size_t encode(char* data, T i); template <> size_t encode<uint8_t>(char* data, uint8_t i) @@ -406,6 +413,18 @@ void Encoder::writeList(const std::list< void Encoder::writeValue(const qpid::types::Variant& value, const Descriptor* d) { + if (d) { + writeDescriptor(*d); // Write this descriptor before any in the value. + d = 0; + } + // Write any descriptors attached to the value. + const Variant::List& descriptors = value.getDescriptors(); + for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) { + if (i->getType() == types::VAR_STRING) + writeDescriptor(Descriptor(CharSequence::create(i->asString()))); + else + writeDescriptor(Descriptor(i->asUint64())); + } switch (value.getType()) { case qpid::types::VAR_VOID: writeNull(d); @@ -477,18 +496,28 @@ void Encoder::writeDescriptor(const Desc break; } } + void Encoder::check(size_t s) { if (position + s > size) { - QPID_LOG(notice, "Buffer overflow for write of size " << s << " to buffer of size " << size << " at position " << position); - assert(false); - throw qpid::Exception("Buffer overflow in encoder!"); + if (grow) { + buffer.resize(buffer.size() + s); + data = const_cast<char*>(buffer.data()); + size = buffer.size(); + } + else { + QPID_LOG(notice, "Buffer overflow for write of size " << s + << " to buffer of size " << size << " at position " << position); + assert(false); + throw Overflow(); + } } } -Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0) {} + size_t Encoder::getPosition() { return position; } size_t Encoder::getSize() const { return size; } char* Encoder::getData() { return data + position; } +std::string Encoder::getBuffer() { return buffer; } void Encoder::resetPosition(size_t p) { assert(p <= size); position = p; } }} // namespace qpid::amqp Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h Tue Mar 3 14:58:01 2015 @@ -23,6 +23,7 @@ */ #include "qpid/sys/IntegerTypes.h" #include "qpid/amqp/Constructor.h" +#include "qpid/Exception.h" #include <list> #include <map> #include <stddef.h> @@ -43,6 +44,18 @@ struct Descriptor; class Encoder { public: + struct Overflow : public Exception { Overflow(); }; + + /** Create an encoder that writes into the buffer at data up to size bytes. + * Write operations throw Overflow if encoding exceeds size bytes. + */ + QPID_COMMON_EXTERN Encoder(char* data, size_t size); + + /** Create an encoder that manages its own buffer. Buffer grows to accomodate + * all encoded data. Call getBuffer() to get the buffer. + */ + QPID_COMMON_EXTERN Encoder(); + void writeCode(uint8_t); void write(bool); @@ -100,19 +113,27 @@ class Encoder QPID_COMMON_EXTERN void writeList(const std::list<qpid::types::Variant>& value, const Descriptor* d=0, bool large=true); void writeDescriptor(const Descriptor&); - QPID_COMMON_EXTERN Encoder(char* data, size_t size); QPID_COMMON_EXTERN size_t getPosition(); void resetPosition(size_t p); char* skip(size_t); void writeBytes(const char* bytes, size_t count); virtual ~Encoder() {} + + /** Return the total size of the buffer. */ size_t getSize() const; - protected: + + /** Return the growable buffer. */ + std::string getBuffer(); + + /** Return the unused portion of the buffer. */ char* getData(); + private: char* data; size_t size; size_t position; + bool grow; + std::string buffer; void write(const CharSequence& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d); void write(const std::string& v, std::pair<uint8_t, uint8_t> codes, const Descriptor* d); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h Tue Mar 3 14:58:01 2015 @@ -26,6 +26,9 @@ namespace qpid { namespace amqp { +// NOTE: If you add descriptor symbols and codes here, you must also update the DescriptorMap +// constructor in Descriptor.cpp. + namespace message { const std::string HEADER_SYMBOL("amqp:header:list"); const std::string PROPERTIES_SYMBOL("amqp:properties:list"); @@ -36,6 +39,7 @@ const std::string AMQP_SEQUENCE_SYMBOL(" const std::string AMQP_VALUE_SYMBOL("amqp:amqp-value:*"); const std::string DATA_SYMBOL("amqp:data:binary"); const std::string FOOTER_SYMBOL("amqp:footer:map"); +const std::string ACCEPTED_SYMBOL("amqp:accepted:list"); const uint64_t HEADER_CODE(0x70); const uint64_t DELIVERY_ANNOTATIONS_CODE(0x71); @@ -46,6 +50,7 @@ const uint64_t DATA_CODE(0x75); const uint64_t AMQP_SEQUENCE_CODE(0x76); const uint64_t AMQP_VALUE_CODE(0x77); const uint64_t FOOTER_CODE(0x78); +const uint64_t ACCEPTED_CODE(0x24); const Descriptor HEADER(HEADER_CODE); const Descriptor DELIVERY_ANNOTATIONS(DELIVERY_ANNOTATIONS_CODE); Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Mar 3 14:58:01 2015 @@ -7,4 +7,4 @@ /qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker:1061302-1072333 /qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker:1144319-1179855 /qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker:1299027-1303795 -/qpid/trunk/qpid/cpp/src/qpid/broker:1643238-1658732 +/qpid/trunk/qpid/cpp/src/qpid/broker:1643238-1663687 Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp Tue Mar 3 14:58:01 2015 @@ -297,6 +297,8 @@ void Queue::deliverTo(Message msg, TxBuf if (txn) { TxOp::shared_ptr op(new TxPublish(msg, shared_from_this())); txn->enlist(op); + QPID_LOG(debug, "Message " << msg.getSequence() << " enqueue on " << name + << " enlisted in " << txn); } else { if (enqueue(0, msg)) { push(msg); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Tue Mar 3 14:58:01 2015 @@ -165,8 +165,8 @@ Connection::~Connection() { if (ticker) ticker->cancel(); getBroker().getConnectionObservers().closed(*this); - pn_transport_free(transport); pn_connection_free(connection); + pn_transport_free(transport); #ifdef HAVE_PROTON_EVENTS pn_collector_free(collector); #endif Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h Tue Mar 3 14:58:01 2015 @@ -22,6 +22,7 @@ * */ #include <string> +#include <qpid/Exception.h> namespace qpid { namespace broker { @@ -29,7 +30,7 @@ namespace amqp { /** * Exception to signal various AMQP 1.0 defined conditions */ -class Exception : public std::exception +class Exception : public qpid::Exception { public: Exception(const std::string& name, const std::string& description); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Tue Mar 3 14:58:01 2015 @@ -100,6 +100,7 @@ namespace { boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session)); return copy; } + private: pn_delivery_t* delivery; boost::shared_ptr<Session> session; @@ -146,8 +147,8 @@ void DecodingIncoming::deliver(boost::in { qpid::broker::Message message(received, received); userid.verify(message.getUserId()); - handle(message, session.getTransaction(delivery)); received->begin(); + handle(message, session.getTransaction(delivery)); Transfer t(delivery, sessionPtr); received->end(t); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Tue Mar 3 14:58:01 2015 @@ -28,6 +28,7 @@ #include "qpid/broker/TopicKeyNode.h" #include "qpid/sys/OutputControl.h" #include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/amqp/MessageEncoder.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" @@ -90,13 +91,13 @@ bool OutgoingFromQueue::doWork() return true; } else { pn_link_drained(link); - QPID_LOG(debug, "No message available on " << queue->getName()); + QPID_LOG(trace, "No message available on " << queue->getName()); } } catch (const qpid::framing::ResourceDeletedException& e) { throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, e.what()); } } else { - QPID_LOG(debug, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link)); + QPID_LOG(trace, "Can't deliver to " << getName() << " from " << queue->getName() << ": " << pn_link_credit(link)); } return false; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp Tue Mar 3 14:58:01 2015 @@ -61,6 +61,8 @@ namespace qpid { namespace broker { namespace amqp { +using namespace qpid::amqp::transaction; + namespace { pn_bytes_t convert(const std::string& s) { @@ -209,6 +211,7 @@ class IncomingToCoordinator : public Dec public: IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent) : DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", pn_link_name(link)) {} + ~IncomingToCoordinator() { session.abort(); } void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, pn_delivery_t*); void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {} @@ -218,7 +221,9 @@ class IncomingToCoordinator : public Dec Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), authorise(connection.getUserId(), connection.getBroker().getAcl()), - detachRequested(), txnId((boost::format("%1%") % s).str()) {} + detachRequested(), + tx(*this) +{} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) @@ -383,6 +388,7 @@ void Session::attach(pn_link_t* link) //i.e a subscription std::string name; if (pn_terminus_get_type(source) == PN_UNSPECIFIED) { + pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No source specified!"); } else if (pn_terminus_is_dynamic(source)) { name = generateName(link); @@ -399,6 +405,7 @@ void Session::attach(pn_link_t* link) pn_terminus_t* target = pn_link_remote_target(link); std::string name; if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { + pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!"); } else if (pn_terminus_get_type(target) == PN_COORDINATOR) { QPID_LOG(debug, "Received attach request for incoming link to transaction coordinator on " << this); @@ -634,11 +641,12 @@ void Session::readable(pn_link_t* link, if (target->second->haveWork()) out.activateOutput(); } } + void Session::writable(pn_link_t* link, pn_delivery_t* delivery) { OutgoingLinks::iterator sender = outgoing.find(link); if (sender == outgoing.end()) { - QPID_LOG(error, "Delivery returned for unknown link"); + QPID_LOG(error, "Delivery returned for unknown link " << pn_link_name(link)); } else { sender->second->handle(delivery); } @@ -647,7 +655,7 @@ void Session::writable(pn_link_t* link, bool Session::dispatch() { bool output(false); - if (commitPending.boolCompareAndSwap(true, false)) { + if (tx.commitPending.boolCompareAndSwap(true, false)) { committed(true); } for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) { @@ -735,7 +743,7 @@ void Session::detachedByManagement() TxBuffer* Session::getTransaction(const std::string& id) { - return (txn.get() && id == txnId) ? txn.get() : 0; + return (tx.buffer.get() && id == tx.id) ? tx.buffer.get() : 0; } TxBuffer* Session::getTransaction(pn_delivery_t* delivery) @@ -746,42 +754,41 @@ TxBuffer* Session::getTransaction(pn_del std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* delivery) { std::pair<TxBuffer*,uint64_t> result((TxBuffer*)0, 0); - if (pn_delivery_remote_state(delivery) == qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) { + if (pn_delivery_remote_state(delivery) == TRANSACTIONAL_STATE_CODE) { pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery)); - if (data && pn_data_next(data)) { - size_t count = pn_data_get_list(data); - if (count > 0) { + pn_data_rewind(data); + size_t count = 0; + if (data && pn_data_next(data) && (count = pn_data_get_list(data)) > 0) { + pn_data_enter(data); + pn_data_next(data); + std::string id = convert(pn_data_get_binary(data)); + result.first = getTransaction(id); + if (!result.first) { + QPID_LOG(error, "Transaction not found for id: " << id); + } + if (count > 1 && pn_data_next(data)) { pn_data_enter(data); pn_data_next(data); - std::string id = convert(pn_data_get_binary(data)); - result.first = getTransaction(id); - if (!result.first) { - QPID_LOG(error, "Transaction not found for id: " << id); - } - if (count > 1 && pn_data_next(data) && pn_data_is_described(data)) { - pn_data_enter(data); - pn_data_next(data); - result.second = pn_data_get_ulong(data); - } - pn_data_exit(data); + result.second = pn_data_get_ulong(data); } - } else { - QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); } + else + QPID_LOG(error, "Transactional delivery " << delivery << " appears to have no data"); } return result; } std::string Session::declare() { - if (txn.get()) { + if (tx.buffer.get()) { //not sure what the error code should be; none in spec really fit well. - throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "Session only supports one transaction active at a time"); + throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, + "Session only supports one transaction active at a time"); } - txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); - connection.getBroker().getBrokerObservers().startTx(txn); + tx.buffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); + connection.getBroker().getBrokerObservers().startTx(tx.buffer); txStarted(); - return txnId; + return tx.id; } namespace { @@ -795,32 +802,41 @@ namespace { boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new AsyncCommit(session)); return copy; } + private: boost::shared_ptr<Session> session; }; } -void Session::discharge(const std::string& id, bool failed) +void Session::discharge(const std::string& id, bool failed, pn_delivery_t* delivery) { - if (!txn.get() || id != txnId) { - throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, "No transaction declared with that id"); + QPID_LOG(debug, "Coordinator " << (failed ? " rollback" : " commit") + << " transaction " << id); + if (!tx.buffer.get() || id != tx.id) { + throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, + Msg() << "Cannot discharge transaction " << id + << (tx.buffer.get() ? Msg() << ", current transaction is " << tx.id : + Msg() << ", no current transaction")); } + tx.discharge = delivery; if (failed) { abort(); } else { - txn->begin(); - txn->startCommit(&connection.getBroker().getStore()); + tx.buffer->begin(); + tx.buffer->startCommit(&connection.getBroker().getStore()); AsyncCommit callback(shared_from_this()); - txn->end(callback); + tx.buffer->end(callback); } } void Session::abort() { - if (txn) { - txn->rollback(); + if (tx.buffer) { + tx.dischargeComplete(); + tx.buffer->rollback(); txAborted(); - txn = boost::intrusive_ptr<TxBuffer>(); + tx.buffer.reset(); + QPID_LOG(debug, "Transaction " << tx.id << " rolled back"); } } @@ -828,16 +844,18 @@ void Session::committed(bool sync) { if (sync) { //this is on IO thread - if (txn.get()) { - txn->endCommit(&connection.getBroker().getStore()); + tx.dischargeComplete(); + if (tx.buffer.get()) { + tx.buffer->endCommit(&connection.getBroker().getStore()); txCommitted(); - txn = boost::intrusive_ptr<TxBuffer>(); + tx.buffer.reset(); + QPID_LOG(debug, "Transaction " << tx.id << " comitted"); } else { throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction vanished during async commit"); } } else { //this is not on IO thread, need to delay processing until on IO thread - if (commitPending.boolCompareAndSwap(false, true)) { + if (tx.commitPending.boolCompareAndSwap(false, true)) { qpid::sys::Mutex::ScopedLock l(lock); if (!deleted) { out.activateOutput(); @@ -878,7 +896,7 @@ void IncomingToCoordinator::deliver(boos { if (message && message->isTypedBody()) { QPID_LOG(debug, "Coordinator got message: @" << message->getBodyDescriptor() << " " << message->getTypedBody()); - if (message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, qpid::amqp::transaction::DECLARE_CODE)) { + if (message->getBodyDescriptor().match(DECLARE_SYMBOL, DECLARE_CODE)) { std::string id = session.declare(); //encode the txn id in a 'declared' list on the disposition pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery)); @@ -887,22 +905,38 @@ void IncomingToCoordinator::deliver(boos pn_data_put_binary(data, convert(id)); pn_data_exit(data); pn_data_exit(data); - pn_delivery_update(delivery, qpid::amqp::transaction::DECLARED_CODE); + pn_delivery_update(delivery, DECLARED_CODE); pn_delivery_settle(delivery); session.incomingMessageAccepted(); - } else if (message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, qpid::amqp::transaction::DISCHARGE_CODE)) { + QPID_LOG(debug, "Coordinator declared transaction " << id); + } else if (message->getBodyDescriptor().match(DISCHARGE_SYMBOL, DISCHARGE_CODE)) { if (message->getTypedBody().getType() == qpid::types::VAR_LIST) { qpid::types::Variant::List args = message->getTypedBody().asList(); qpid::types::Variant::List::const_iterator i = args.begin(); if (i != args.end()) { std::string id = *i; bool failed = ++i != args.end() ? i->asBool() : false; - session.discharge(id, failed); - DecodingIncoming::deliver(message, delivery);//ensures async completion of commit is taken care of + session.discharge(id, failed, delivery); } + + } else { + throw framing::IllegalArgumentException( + Msg() << "Coordinator unknown message: @" << + message->getBodyDescriptor() << " " << message->getTypedBody()); } } } } +Session::Transaction::Transaction(Session& s) : + session(s), id((boost::format("%1%") % &s).str()), discharge(0) {} + +// Called in IO thread to signal completion of dischage by settling discharge message. +void Session::Transaction::dischargeComplete() { + if (buffer.get() && discharge) { + session.accepted(discharge, false); // Queue up accept and activate output. + discharge = 0; + } +} + }}} // namespace qpid::broker::amqp Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h Tue Mar 3 14:58:01 2015 @@ -91,7 +91,7 @@ class Session : public ManagedSession, p std::pair<TxBuffer*,uint64_t> getTransactionalState(pn_delivery_t*); //transaction coordination: std::string declare(); - void discharge(const std::string& id, bool failed); + void discharge(const std::string& id, bool failed, pn_delivery_t*); void abort(); protected: void detachedByManagement(); @@ -109,9 +109,18 @@ class Session : public ManagedSession, p std::set< boost::shared_ptr<Queue> > exclusiveQueues; Authorise authorise; bool detachRequested; - boost::intrusive_ptr<TxBuffer> txn; - std::string txnId; - qpid::sys::AtomicValue<bool> commitPending; + + struct Transaction { + Transaction(Session&); + void dischargeComplete(); + + Session& session; + boost::intrusive_ptr<TxBuffer> buffer; + std::string id; + qpid::sys::AtomicValue<bool> commitPending; + pn_delivery_t* discharge; + }; + Transaction tx; struct ResolvedNode { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Tue Mar 3 14:58:01 2015 @@ -205,7 +205,7 @@ void ConnectionHandler::fail(const std:: { errorCode = CLOSE_CODE_FRAMING_ERROR; errorText = message; - QPID_LOG(warning, message); + QPID_LOG(debug, message); setState(FAILED); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Tue Mar 3 14:58:01 2015 @@ -510,9 +510,9 @@ void AddressHelper::checkAssertion(pn_te requested.erase(j->first); } } else if (key == AUTO_DELETE) { - PnData(data).read(v); + PnData(data).get(v); isAutoDeleted = v.asBool(); - } else if (j != requested.end() && (PnData(data).read(v) && v.asString() == j->second.asString())) { + } else if (j != requested.end() && (PnData(data).get(v) && v.asString() == j->second.asString())) { requested.erase(j->first); } } @@ -646,7 +646,7 @@ void AddressHelper::configure(pn_link_t* } else { pn_data_put_ulong(filter, i->descriptorCode); } - PnData(filter).write(i->value); + PnData(filter).put(i->value); pn_data_exit(filter); } pn_data_exit(filter); @@ -733,7 +733,7 @@ void AddressHelper::setNodeProperties(pn putLifetimePolicy(data, toLifetimePolicy(i->second.asString())); } else { pn_data_put_symbol(data, convert(i->first)); - PnData(data).write(i->second); + PnData(data).put(i->second); } } pn_data_exit(data); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Tue Mar 3 14:58:01 2015 @@ -25,8 +25,11 @@ #include "Sasl.h" #include "SenderContext.h" #include "SessionContext.h" +#include "Transaction.h" #include "Transport.h" #include "qpid/amqp/descriptors.h" +#include "qpid/amqp/Encoder.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/messaging/exceptions.h" #include "qpid/messaging/AddressImpl.h" #include "qpid/messaging/Duration.h" @@ -43,6 +46,7 @@ #include "qpid/sys/urlAdd.h" #include "config.h" #include <boost/lexical_cast.hpp> +#include <boost/bind.hpp> #include <vector> extern "C" { #include <proton/engine.h> @@ -151,20 +155,23 @@ ConnectionContext::~ConnectionContext() if (ticker) ticker->cancel(); close(); sessions.clear(); - pn_transport_free(engine); pn_connection_free(connection); + pn_transport_free(engine); } bool ConnectionContext::isOpen() const { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return state == CONNECTED && pn_connection_state(connection) & (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); } void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - //wait for outstanding sends to settle + sys::Monitor::ScopedLock l(lock); + syncLH(ssn, l); +} + +void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&) { while (!ssn->settled()) { QPID_LOG(debug, "Waiting for sends to settle on sync()"); wait(ssn);//wait until message has been confirmed @@ -175,18 +182,13 @@ void ConnectionContext::sync(boost::shar void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { //explicitly release messages that have yet to be fetched for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); i != ssn->receivers.end(); ++i) { drain_and_release_messages(ssn, i->second); } - //wait for outstanding sends to settle - while (!ssn->settled()) { - QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait(ssn);//wait until message has been confirmed - wakeupDriver(); - } + syncLH(ssn, l); } if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) { @@ -199,17 +201,11 @@ void ConnectionContext::endSession(boost void ConnectionContext::close() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != CONNECTED) return; if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { - //wait for outstanding sends to settle - while (!i->second->settled()) { - QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait(i->second);//wait until message has been confirmed - } - - + syncLH(i->second, l); if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { pn_session_close(i->second->session); } @@ -246,7 +242,7 @@ bool ConnectionContext::fetch(boost::sha */ qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching); { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn, lnk); if (!lnk->capacity) { pn_link_flow(lnk->receiver, 1); @@ -257,10 +253,10 @@ bool ConnectionContext::fetch(boost::sha return true; } else { { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); pn_link_drain(lnk->receiver, 0); wakeupDriver(); - while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) { + while (pn_link_draining(lnk->receiver) && !pn_link_queued(lnk->receiver)) { QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); wait(ssn, lnk); } @@ -269,7 +265,7 @@ bool ConnectionContext::fetch(boost::sha } } if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); wakeupDriver(); @@ -296,7 +292,7 @@ bool ConnectionContext::get(boost::share { qpid::sys::AbsTime until(convert(timeout)); while (true) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn, lnk); pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); @@ -320,6 +316,9 @@ bool ConnectionContext::get(boost::share haveOutput = true; } } + // Automatically ack messages if we are in a transaction. + if (ssn->transaction) + acknowledgeLH(ssn, &message, false, l); return true; } else if (until > qpid::sys::now()) { waitUntil(ssn, lnk, until); @@ -334,7 +333,7 @@ boost::shared_ptr<ReceiverContext> Conne { qpid::sys::AbsTime until(convert(timeout)); while (true) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn); boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver(); if (r) { @@ -347,9 +346,13 @@ boost::shared_ptr<ReceiverContext> Conne } } -void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) +void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { + sys::Monitor::ScopedLock l(lock); + acknowledgeLH(ssn, message, cumulative, l); +} + +void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); checkClosed(ssn); if (message) { ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); @@ -361,7 +364,7 @@ void ConnectionContext::acknowledge(boos void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); checkClosed(ssn); ssn->nack(MessageImplAccess::get(message).getInternalId(), reject); wakeupDriver(); @@ -369,7 +372,7 @@ void ConnectionContext::nack(boost::shar void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) { lnk->close(); } @@ -401,7 +404,7 @@ void ConnectionContext::drain_and_releas void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); drain_and_release_messages(ssn, lnk); if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) { lnk->close(); @@ -415,7 +418,7 @@ void ConnectionContext::detach(boost::sh void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); lnk->configure(); attach(ssn, lnk->sender); checkClosed(ssn, lnk); @@ -425,7 +428,7 @@ void ConnectionContext::attach(boost::sh void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); lnk->configure(); attach(ssn, lnk->receiver, lnk->capacity); checkClosed(ssn, lnk); @@ -445,11 +448,26 @@ void ConnectionContext::attach(boost::sh } } -void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) +void ConnectionContext::send( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery) +{ + sys::Monitor::ScopedLock l(lock); + sendLH(ssn, snd, message, sync, delivery, l); +} + +void ConnectionContext::sendLH( + boost::shared_ptr<SessionContext> ssn, + boost::shared_ptr<SenderContext> snd, + const qpid::messaging::Message& message, + bool sync, + SenderContext::Delivery** delivery, + sys::Monitor::ScopedLock&) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); checkClosed(ssn); - SenderContext::Delivery* delivery(0); while (pn_transport_pending(engine) > 65536) { QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of output pending; waiting for this to be written..."); notifyOnWrite = true; @@ -457,17 +475,17 @@ void ConnectionContext::send(boost::shar wait(ssn, snd); notifyOnWrite = false; } - while (!snd->send(message, &delivery)) { + while (!snd->send(message, delivery)) { QPID_LOG(debug, "Waiting for capacity..."); wait(ssn, snd);//wait for capacity } wakeupDriver(); - if (sync && delivery) { - while (!delivery->delivered()) { + if (sync && *delivery) { + while (!(*delivery)->delivered()) { QPID_LOG(debug, "Waiting for confirmation..."); wait(ssn, snd);//wait until message has been confirmed } - if (delivery->rejected()) { + if ((*delivery)->rejected()) { throw MessageRejected("Message was rejected by peer"); } @@ -476,46 +494,46 @@ void ConnectionContext::send(boost::shar void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, uint32_t capacity) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); sender->setCapacity(capacity); } uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> sender) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return sender->getCapacity(); } uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> sender) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return sender->getUnsettled(); } void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> receiver, uint32_t capacity) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); receiver->setCapacity(capacity); pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity()); wakeupDriver(); } uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getCapacity(); } uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getAvailable(); } uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> receiver) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return receiver->getUnsettled(); } void ConnectionContext::activateOutput() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state == CONNECTED) wakeupDriver(); } /** @@ -543,8 +561,8 @@ pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | void ConnectionContext::reset() { - pn_transport_free(engine); pn_connection_free(connection); + pn_transport_free(engine); engine = pn_transport(); connection = pn_connection(); @@ -555,7 +573,7 @@ void ConnectionContext::reset() } } -void ConnectionContext::check() { +bool ConnectionContext::check() { if (checkDisconnected()) { if (ConnectionOptions::reconnect) { QPID_LOG(notice, "Auto-reconnecting to " << fullUrl); @@ -564,7 +582,9 @@ void ConnectionContext::check() { } else { throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); } + return true; } + return false; } bool ConnectionContext::checkDisconnected() { @@ -588,7 +608,7 @@ bool ConnectionContext::checkDisconnecte void ConnectionContext::wait() { - check(); + if (check()) return; // Reconnected, may need to re-test condition. lock.wait(); check(); } @@ -630,6 +650,7 @@ void ConnectionContext::waitUntil(boost: void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) { check(); + ssn->error.raise(); if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { pn_condition_t* error = pn_session_remote_condition(ssn->session); std::stringstream text; @@ -690,6 +711,7 @@ void ConnectionContext::checkClosed(boos void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) { + if (s->error) return; pn_session_open(s->session); wakeupDriver(); while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { @@ -718,26 +740,31 @@ void ConnectionContext::restartSession(b boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - if (transactional) throw qpid::messaging::MessagingException("Transactions not yet supported"); + boost::shared_ptr<SessionContext> session; std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n; - SessionMap::const_iterator i = sessions.find(name); - if (i == sessions.end()) { - boost::shared_ptr<SessionContext> s(new SessionContext(connection)); - s->setName(name); - s->session = pn_session(connection); - pn_session_open(s->session); - wakeupDriver(); - while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { - wait(); + { + sys::Monitor::ScopedLock l(lock); + SessionMap::const_iterator i = sessions.find(name); + if (i == sessions.end()) { + session = boost::shared_ptr<SessionContext>(new SessionContext(connection)); + session->setName(name); + pn_session_open(session->session); + wakeupDriver(); + sessions[name] = session; // Add it now so it will be restarted if we reconnect in wait() + while (pn_session_state(session->session) & PN_REMOTE_UNINIT) { + wait(); + } + } else { + throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); } - sessions[name] = s; - return s; - } else { - throw qpid::messaging::KeyError(std::string("Session already exists: ") + name); - } + } + if (transactional) { // Outside of lock + startTxSession(session); + } + return session; } + boost::shared_ptr<SessionContext> ConnectionContext::getSession(const std::string& name) const { SessionMap::const_iterator i = sessions.find(name); @@ -760,7 +787,7 @@ std::string ConnectionContext::getAuthen std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); QPID_LOG(trace, id << " decode(" << size << ")"); if (readHeader) { size_t decoded = readProtocolHeader(buffer, size); @@ -805,7 +832,7 @@ std::size_t ConnectionContext::decodePla } std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); QPID_LOG(trace, id << " encode(" << size << ")"); if (writeHeader) { size_t encoded = writeProtocolHeader(buffer, size); @@ -843,19 +870,19 @@ std::size_t ConnectionContext::encodePla } bool ConnectionContext::canEncodePlain() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); return haveOutput && state == CONNECTED; } void ConnectionContext::closed() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); state = DISCONNECTED; lock.notifyAll(); } void ConnectionContext::opened() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); state = CONNECTED; lock.notifyAll(); } @@ -921,7 +948,7 @@ const qpid::messaging::ConnectionOptions std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); size_t decoded = 0; try { if (sasl.get() && !sasl->authenticated()) { @@ -939,7 +966,7 @@ std::size_t ConnectionContext::decode(co } std::size_t ConnectionContext::encode(char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); size_t encoded = 0; try { if (sasl.get() && sasl->canEncode()) { @@ -957,7 +984,7 @@ std::size_t ConnectionContext::encode(ch } bool ConnectionContext::canEncode() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (sasl.get()) { try { if (sasl->canEncode()) return true; @@ -978,26 +1005,21 @@ const std::string CLIENT_PPID("qpid.clie } void ConnectionContext::setProperties() { - pn_data_t* data = pn_connection_properties(connection); - pn_data_put_map(data); - pn_data_enter(data); - - pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME)); - std::string processName = sys::SystemInfo::getProcessName(); - pn_data_put_string(data, PnData::str(processName)); - - pn_data_put_symbol(data, PnData::str(CLIENT_PID)); - pn_data_put_int(data, sys::SystemInfo::getProcessId()); - - pn_data_put_symbol(data, PnData::str(CLIENT_PPID)); - pn_data_put_int(data, sys::SystemInfo::getParentProcessId()); - + PnData data(pn_connection_properties(connection)); + pn_data_put_map(data.data); + pn_data_enter(data.data); + data.putSymbol(CLIENT_PROCESS_NAME); + data.putSymbol(sys::SystemInfo::getProcessName()); + data.putSymbol(CLIENT_PID); + data.put(int32_t(sys::SystemInfo::getProcessId())); + data.putSymbol(CLIENT_PPID); + data.put(int32_t(sys::SystemInfo::getParentProcessId())); for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { - pn_data_put_symbol(data, PnData::str(i->first)); - PnData(data).write(i->second); + data.putSymbol(i->first); + data.put(i->second); } - pn_data_exit(data); + pn_data_exit(data.data); } const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettings() @@ -1007,7 +1029,7 @@ const qpid::sys::SecuritySettings* Conne void ConnectionContext::open() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); if (!driver) driver = DriverImpl::getDefault(); QPID_LOG(info, "Starting connection to " << fullUrl); @@ -1049,7 +1071,7 @@ void ConnectionContext::autoconnect() void ConnectionContext::reconnect(const Url& url) { QPID_LOG(notice, "Reconnecting to " << url); - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); if (!driver) driver = DriverImpl::getDefault(); reset(); @@ -1137,7 +1159,7 @@ bool ConnectionContext::tryOpenAddr(cons std::string ConnectionContext::getUrl() const { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + sys::Monitor::ScopedLock l(lock); return (state == CONNECTED) ? currentUrl.str() : std::string(); } @@ -1209,6 +1231,40 @@ bool ConnectionContext::CodecAdapter::ca return context.canEncodePlain(); } +void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> session) { + try { + QPID_LOG(debug, id << " attaching transaction for " << session->getName()); + boost::shared_ptr<Transaction> tx(new Transaction(session->session)); + session->transaction = tx; + attach(session, tx); + tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, _4, _5), session); + } catch (const Exception& e) { + throw TransactionError(Msg() << "Cannot start transaction: " << e.what()); + } +} + +void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, bool fail) { + { + sys::Monitor::ScopedLock l(lock); + checkClosed(session); + if (!session->transaction) + throw TransactionError("No Transaction"); + Transaction::SendFunction sendFn = boost::bind( + &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, boost::ref(l)); + syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers have been received. + session->transaction->discharge(sendFn, session, fail); + session->transaction->declare(sendFn, session); + } +} + +void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) { + discharge(session, false); +} + +void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) { + discharge(session, true); +} + // setup the transport and connection objects: void ConnectionContext::configureConnection() Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1663719&r1=1663718&r2=1663719&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Tue Mar 3 14:58:01 2015 @@ -34,6 +34,7 @@ #include "qpid/sys/Monitor.h" #include "qpid/types/Variant.h" #include "qpid/messaging/amqp/TransportContext.h" +#include "SenderContext.h" struct pn_connection_t; struct pn_link_t; @@ -59,7 +60,6 @@ class DriverImpl; class ReceiverContext; class Sasl; class SessionContext; -class SenderContext; class Transport; /** @@ -82,10 +82,20 @@ class ConnectionContext : public qpid::s void detach(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void drain_and_release_messages(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); bool isClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); - void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, bool sync); + + // Link operations + void send(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, + const qpid::messaging::Message& message, bool sync, + SenderContext::Delivery** delivery); + bool fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); bool get(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + + // Session operations void acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative); + void commit(boost::shared_ptr<SessionContext> ssn); + void rollback(boost::shared_ptr<SessionContext> ssn); + void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& message, bool reject); void sync(boost::shared_ptr<SessionContext> ssn); boost::shared_ptr<ReceiverContext> nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration timeout); @@ -93,10 +103,10 @@ class ConnectionContext : public qpid::s void setOption(const std::string& name, const qpid::types::Variant& value); std::string getAuthenticatedUsername(); + // Link operations void setCapacity(boost::shared_ptr<SenderContext>, uint32_t); uint32_t getCapacity(boost::shared_ptr<SenderContext>); uint32_t getUnsettled(boost::shared_ptr<SenderContext>); - void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t); uint32_t getCapacity(boost::shared_ptr<ReceiverContext>); uint32_t getAvailable(boost::shared_ptr<ReceiverContext>); @@ -159,9 +169,12 @@ class ConnectionContext : public qpid::s bool notifyOnWrite; boost::intrusive_ptr<qpid::sys::TimerTask> ticker; - void check(); + bool check(); bool checkDisconnected(); void waitNoReconnect(); + + // NOTE: All wait*() functions must be called in a loop that checks for the + // waited condition with the lock held. void wait(); void waitUntil(qpid::sys::AbsTime until); void wait(boost::shared_ptr<SessionContext>); @@ -170,10 +183,12 @@ class ConnectionContext : public qpid::s void wait(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>, qpid::sys::AbsTime until); void waitUntil(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>, qpid::sys::AbsTime until); + void checkClosed(boost::shared_ptr<SessionContext>); void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<ReceiverContext>); void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*); + void wakeupDriver(); void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0); void autoconnect(); @@ -194,8 +209,18 @@ class ConnectionContext : public qpid::s std::string getError(); bool useSasl(); void setProperties(); + void configureConnection(); bool checkTransportError(std::string&); + + void discharge(boost::shared_ptr<SessionContext>, bool fail); + void startTxSession(boost::shared_ptr<SessionContext>); + + void syncLH(boost::shared_ptr<SessionContext> ssn, sys::Monitor::ScopedLock&); + void sendLH(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext> ctxt, + const qpid::messaging::Message& message, bool sync, + SenderContext::Delivery** delivery, sys::Monitor::ScopedLock&); + void acknowledgeLH(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&); }; }}} // namespace qpid::messaging::amqp --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org