Author: kwall
Date: Tue Mar  3 14:58:01 2015
New Revision: 1663719

URL: http://svn.apache.org/r1663719
Log:
merge from trunk

Added:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeModules/CheckSizetDistinct.cmake
      - copied unchanged from r1663687, 
qpid/trunk/qpid/cpp/CMakeModules/CheckSizetDistinct.cmake
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp
      - copied unchanged from r1663687, 
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transaction.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/Transaction.h
      - copied unchanged from r1663687, 
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/Transaction.h
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/aix/
      - copied from r1663687, qpid/trunk/qpid/cpp/src/qpid/sys/aix/
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interop_tests.py
      - copied unchanged from r1663687, 
qpid/trunk/qpid/cpp/src/tests/interop_tests.py
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py
      - copied unchanged from r1663687, 
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/tx.py
Removed:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeModules/CheckSizeTNativeType.cmake
Modified:
    qpid/branches/QPID-6262-JavaBrokerNIO/   (props changed)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/   (props changed)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/   (props changed)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt   
(contents, props changed)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/   (props 
changed)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/   (props changed)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/   (props changed)
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py
    qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar  3 14:58:01 2015
@@ -3,4 +3,4 @@
 /qpid/branches/java-broker-bdb-ha2:1576683-1583556
 /qpid/branches/java-network-refactor:805429-825319
 /qpid/branches/mcpierce-QPID-4719:1477004-1477093
-/qpid/trunk:1643238-1659605
+/qpid/trunk:1643238-1663687

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar  3 14:58:01 2015
@@ -6,4 +6,4 @@
 /qpid/branches/mcpierce-QPID-4719/qpid:1477004-1477093
 /qpid/branches/qpid-2935/qpid:1061302-1072333
 /qpid/branches/qpid-3346/qpid:1144319-1179855
-/qpid/trunk/qpid:1643238-1659605
+/qpid/trunk/qpid:1643238-1663687

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/CMakeLists.txt Tue Mar  3 
14:58:01 2015
@@ -23,7 +23,6 @@ set (CMAKE_BUILD_TYPE RelWithDebInfo CAC
 if (CMAKE_BUILD_TYPE MATCHES "Deb")
   set (has_debug_symbols " (has debug symbols)")
 endif (CMAKE_BUILD_TYPE MATCHES "Deb")
-message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}")
 
 project(qpid-cpp)
 
@@ -242,5 +241,5 @@ add_subdirectory(examples)
 include (CPack)
 
 # Build type message again, last so it is visible at end of output.
-message("Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}")
+message(STATUS "Build type is \"${CMAKE_BUILD_TYPE}\"${has_debug_symbols}")
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/include/qpid/types/Variant.h 
Tue Mar  3 14:58:01 2015
@@ -89,7 +89,9 @@ class QPID_TYPES_CLASS_EXTERN Variant
     QPID_TYPES_EXTERN Variant(float);
     QPID_TYPES_EXTERN Variant(double);
     QPID_TYPES_EXTERN Variant(const std::string&);
+    QPID_TYPES_EXTERN Variant(const std::string& value, const std::string& 
encoding);
     QPID_TYPES_EXTERN Variant(const char*);
+    QPID_TYPES_EXTERN Variant(const char* value, const char* encoding);
     QPID_TYPES_EXTERN Variant(const Map&);
     QPID_TYPES_EXTERN Variant(const List&);
     QPID_TYPES_EXTERN Variant(const Variant&);
@@ -156,9 +158,10 @@ class QPID_TYPES_CLASS_EXTERN Variant
     QPID_TYPES_EXTERN Map& asMap();
     QPID_TYPES_EXTERN const List& asList() const;
     QPID_TYPES_EXTERN List& asList();
+
     /**
-     * Unlike asString(), getString() will not do any conversions and
-     * will throw InvalidConversion if the type is not STRING.
+     * Unlike asString(), getString() will not do any conversions.
+     * @exception InvalidConversion if the type is not STRING.
      */
     QPID_TYPES_EXTERN const std::string& getString() const;
     QPID_TYPES_EXTERN std::string& getString();
@@ -168,9 +171,45 @@ class QPID_TYPES_CLASS_EXTERN Variant
 
     QPID_TYPES_EXTERN bool isEqualTo(const Variant& a) const;
 
+    /** Reset value to VOID, does not reset the descriptors. */
     QPID_TYPES_EXTERN void reset();
+
+    /** True if there is at least one descriptor associated with this variant. 
*/
+    QPID_TYPES_EXTERN bool isDescribed() const;
+
+    /** Get the first descriptor associated with this variant.
+     *
+     * Normally there is at most one descriptor, when there are multiple
+     * descriptors use getDescriptors()
+     *
+     *@return The first descriptor or VOID if there is no descriptor.
+     *@see isDescribed, getDescriptors
+     */
+    QPID_TYPES_EXTERN Variant getDescriptor() const;
+
+    /** Set a single descriptor for this Variant. The descriptor must be a 
string or integer. */
+    QPID_TYPES_EXTERN void setDescriptor(const Variant& descriptor);
+
+    /** Return a modifiable list of descriptors for this Variant.
+     * Used in case where there are multiple descriptors, for a single 
descriptor use
+     * getDescriptor and setDescriptor.
+     */
+    QPID_TYPES_EXTERN List& getDescriptors();
+
+    /** Return the list of descriptors for this Variant.
+     * Used in case where there are multiple descriptors, for a single 
descriptor use
+     * getDescriptor and setDescriptor.
+     */
+    QPID_TYPES_EXTERN const List& getDescriptors() const;
+
+    /** Create a described value */
+    QPID_TYPES_EXTERN static Variant described(const Variant& descriptor, 
const Variant& value);
+
+    /** Create a described list, a common special case */
+    QPID_TYPES_EXTERN static Variant described(const Variant& descriptor, 
const List& value);
+
   private:
-    VariantImpl* impl;
+    mutable VariantImpl* impl;
 };
 
 #ifndef SWIG

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar  3 14:58:01 2015
@@ -6,4 +6,4 @@
 /qpid/branches/java-network-refactor/qpid/cpp/src:805429-825319
 /qpid/branches/qpid-2935/qpid/cpp/src:1061302-1072333
 /qpid/branches/qpid-3346/qpid/cpp/src:1144319-1179855
-/qpid/trunk/qpid/cpp/src:1643238-1659605
+/qpid/trunk/qpid/cpp/src:1643238-1663687

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt Tue Mar  
3 14:58:01 2015
@@ -42,7 +42,7 @@ include(CheckIncludeFiles)
 include(CheckIncludeFileCXX)
 include(CheckLibraryExists)
 include(CheckSymbolExists)
-include(CheckSizeTNativeType)
+include(CheckSizetDistinct)
 
 find_package(PkgConfig)
 find_package(Ruby)
@@ -351,7 +351,7 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windo
   mark_as_advanced(QPID_POLLER)
 endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
 
-check_size_t_native_type (QPID_SIZE_T_NATIVE)
+check_size_t_distinct (QPID_SIZE_T_DISTINCT)
 
 option(BUILD_SASL "Build with Cyrus SASL support" ${SASL_FOUND})
 if (BUILD_SASL)

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar  3 14:58:01 2015
@@ -7,4 +7,4 @@
 /qpid/branches/qpid-2393/qpid/cpp/src/CMakeLists.txt:1375790-1376954
 /qpid/branches/qpid-2935/qpid/cpp/src/CMakeLists.txt:1061302-1072333
 /qpid/branches/qpid-3346/qpid/cpp/src/CMakeLists.txt:1144319-1179855
-/qpid/trunk/qpid/cpp/src/CMakeLists.txt:1643238-1658732
+/qpid/trunk/qpid/cpp/src/CMakeLists.txt:1643238-1663687

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/amqp.cmake Tue Mar  3 
14:58:01 2015
@@ -144,6 +144,8 @@ if (BUILD_AMQP)
          qpid/messaging/amqp/SessionHandle.cpp
          qpid/messaging/amqp/TcpTransport.h
          qpid/messaging/amqp/TcpTransport.cpp
+         qpid/messaging/amqp/Transaction.h
+         qpid/messaging/amqp/Transaction.cpp
         )
 
     if (WIN32)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/config.h.cmake Tue Mar  
3 14:58:01 2015
@@ -56,7 +56,7 @@
 #cmakedefine HAVE_SYS_SDT_H ${HAVE_SYS_SDT_H}
 #cmakedefine HAVE_LOG_AUTHPRIV
 #cmakedefine HAVE_LOG_FTP
-#cmakedefine QPID_SIZE_T_NATIVE
+#cmakedefine QPID_SIZE_T_DISTINCT
 #cmakedefine HAVE_PROTON_TRACER
 #cmakedefine USE_PROTON_TRANSPORT_CONDITION
 #cmakedefine HAVE_PROTON_EVENTS

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/legacystore.cmake Tue 
Mar  3 14:58:01 2015
@@ -39,8 +39,8 @@ else (DEFINED legacystore_force)
                 #
                 # allow legacystore to be built
                 #
-                message(STATUS "BerkeleyDB for C++ and libaio found, 
Legacystore support enabled")
-                set (legacystore_default ON)
+                message(STATUS "BerkeleyDB for C++ and libaio found, 
Legacystore support disabled by default (deprecated, use linearstore instead).")
+                set (legacystore_default OFF) # Disabled, deprecated. Use 
linearstore instead.
             else (HAVE_AIO AND HAVE_AIO_H)
                 if (NOT HAVE_AIO)
                     message(STATUS "Legacystore requires libaio which is 
absent.")

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/linearstore.cmake Tue 
Mar  3 14:58:01 2015
@@ -39,8 +39,8 @@ else (DEFINED linearstore_force)
                 #
                 # allow linearstore to be built
                 #
-                message(STATUS "BerkeleyDB for C++ and libaio found, 
Linearstore support may be enabled (currently experimental and disabled by 
default)")
-                set (linearstore_default OFF) # Temporarily disabled
+                message(STATUS "BerkeleyDB for C++ and libaio found, 
Linearstore support enabled.")
+                set (linearstore_default ON)
             else (HAVE_AIO AND HAVE_AIO_H)
                 if (NOT HAVE_AIO)
                     message(STATUS "Linearstore requires libaio which is 
absent.")

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Options.cpp Tue Mar 
 3 14:58:01 2015
@@ -146,7 +146,7 @@ template QPID_COMMON_EXTERN po::value_se
 template QPID_COMMON_EXTERN po::value_semantic* create_value(uint16_t& val, 
const std::string& arg);
 template QPID_COMMON_EXTERN po::value_semantic* create_value(uint32_t& val, 
const std::string& arg);
 template QPID_COMMON_EXTERN po::value_semantic* create_value(uint64_t& val, 
const std::string& arg);
-#ifdef QPID_SIZE_T_NATIVE
+#ifdef QPID_SIZE_T_DISTINCT
 template QPID_COMMON_EXTERN po::value_semantic* create_value(size_t& val, 
const std::string& arg);
 #endif
 template QPID_COMMON_EXTERN po::value_semantic* create_value(double& val, 
const std::string& arg);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/Url.cpp Tue Mar  3 
14:58:01 2015
@@ -113,8 +113,10 @@ class UrlParser {
         const char* at = std::find(i, end, '@');
         if (at == end) return false;
         const char* slash = std::find(i, at, '/');
-        url.setUser(string(i, slash));
-        const char* pass = (slash == at) ? slash : slash+1;
+        const char* colon = std::find(i, at, ':');
+        const char* sep = std::min(slash, colon);
+        url.setUser(string(i, sep));
+        const char* pass = (sep == at) ? sep : sep+1;
         url.setPass(string(pass, at));
         i = at+1;
         return true;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/CharSequence.cpp 
Tue Mar  3 14:58:01 2015
@@ -35,7 +35,7 @@ CharSequence::operator bool() const
 }
 std::string CharSequence::str() const
 {
-    return std::string(data, size);
+    return (data && size) ? std::string(data, size) : std::string();
 }
 
 CharSequence CharSequence::create()

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.cpp 
Tue Mar  3 14:58:01 2015
@@ -19,11 +19,17 @@
  *
  */
 #include "Descriptor.h"
+#include "descriptors.h"
+#include <qpid/framing/reply_exceptions.h>
+#include <map>
 
 namespace qpid {
 namespace amqp {
+
 Descriptor::Descriptor(uint64_t code) : type(NUMERIC) { value.code = code; }
+
 Descriptor::Descriptor(const CharSequence& symbol) : type(SYMBOLIC) { 
value.symbol = symbol; }
+
 bool Descriptor::match(const std::string& symbol, uint64_t code) const
 {
     switch (type) {
@@ -58,20 +64,85 @@ Descriptor* Descriptor::nest(const Descr
     return nested.get();
 }
 
-std::ostream& operator<<(std::ostream& os, const Descriptor& d)
-{
-    switch (d.type) {
-      case Descriptor::SYMBOLIC:
-        if (d.value.symbol.data && d.value.symbol.size) os << 
std::string(d.value.symbol.data, d.value.symbol.size);
-        else os << "null";
-        break;
-      case Descriptor::NUMERIC:
-        os << "0x" << std::hex << d.value.code;
-        break;
+namespace {
+
+class DescriptorMap {
+    typedef std::map<uint64_t, std::string> SymbolMap;
+    typedef std::map<std::string, uint64_t> CodeMap;
+
+    SymbolMap symbols;
+    CodeMap codes;
+
+  public:
+    DescriptorMap() {
+        symbols[message::HEADER_CODE] = message::HEADER_SYMBOL;
+        symbols[message::DELIVERY_ANNOTATIONS_CODE] = 
message::DELIVERY_ANNOTATIONS_SYMBOL;
+        symbols[message::MESSAGE_ANNOTATIONS_CODE] = 
message::MESSAGE_ANNOTATIONS_SYMBOL;
+        symbols[message::PROPERTIES_CODE] = message::PROPERTIES_SYMBOL;
+        symbols[message::APPLICATION_PROPERTIES_CODE] = 
message::APPLICATION_PROPERTIES_SYMBOL;
+        symbols[message::DATA_CODE] = message::DATA_SYMBOL;
+        symbols[message::AMQP_SEQUENCE_CODE] = message::AMQP_SEQUENCE_SYMBOL;
+        symbols[message::AMQP_VALUE_CODE] = message::AMQP_VALUE_SYMBOL;
+        symbols[message::FOOTER_CODE] = message::FOOTER_SYMBOL;
+        symbols[message::ACCEPTED_CODE] = message::ACCEPTED_SYMBOL;
+        symbols[sasl::SASL_MECHANISMS_CODE] = sasl::SASL_MECHANISMS_SYMBOL;
+        symbols[sasl::SASL_INIT_CODE] = sasl::SASL_INIT_SYMBOL;
+        symbols[sasl::SASL_CHALLENGE_CODE] = sasl::SASL_CHALLENGE_SYMBOL;
+        symbols[sasl::SASL_RESPONSE_CODE] = sasl::SASL_RESPONSE_SYMBOL;
+        symbols[sasl::SASL_OUTCOME_CODE] = sasl::SASL_OUTCOME_SYMBOL;
+        symbols[filters::LEGACY_DIRECT_FILTER_CODE] = 
filters::LEGACY_DIRECT_FILTER_SYMBOL;
+        symbols[filters::LEGACY_TOPIC_FILTER_CODE] = 
filters::LEGACY_TOPIC_FILTER_SYMBOL;
+        symbols[filters::LEGACY_HEADERS_FILTER_CODE] = 
filters::LEGACY_HEADERS_FILTER_SYMBOL;
+        symbols[filters::SELECTOR_FILTER_CODE] = 
filters::SELECTOR_FILTER_SYMBOL;
+        symbols[filters::XQUERY_FILTER_CODE] = filters::XQUERY_FILTER_SYMBOL;
+        symbols[lifetime_policy::DELETE_ON_CLOSE_CODE] = 
lifetime_policy::DELETE_ON_CLOSE_SYMBOL;
+        symbols[lifetime_policy::DELETE_ON_NO_LINKS_CODE] = 
lifetime_policy::DELETE_ON_NO_LINKS_SYMBOL;
+        symbols[lifetime_policy::DELETE_ON_NO_MESSAGES_CODE] = 
lifetime_policy::DELETE_ON_NO_MESSAGES_SYMBOL;
+        symbols[lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_CODE] = 
lifetime_policy::DELETE_ON_NO_LINKS_OR_MESSAGES_SYMBOL;
+        symbols[transaction::DECLARE_CODE] = transaction::DECLARE_SYMBOL;
+        symbols[transaction::DISCHARGE_CODE] = transaction::DISCHARGE_SYMBOL;
+        symbols[transaction::DECLARED_CODE] = transaction::DECLARED_SYMBOL;
+        symbols[transaction::TRANSACTIONAL_STATE_CODE] = 
transaction::TRANSACTIONAL_STATE_SYMBOL;
+        symbols[0] = "unknown-descriptor";
+
+        for (SymbolMap::const_iterator i = symbols.begin(); i != 
symbols.end(); ++i)
+            codes[i->second] = i->first;
+    }
+
+    std::string operator[](uint64_t code) const {
+        SymbolMap::const_iterator i = symbols.find(code);
+        return (i == symbols.end()) ? "unknown-descriptor" : i->second;
     }
-    if (d.nested.get()) {
-        os << " ->(" << *d.nested << ")";
+
+    uint64_t operator[](const std::string& symbol) const {
+         CodeMap::const_iterator i = codes.find(symbol);
+         return (i == codes.end()) ? 0 : i->second;
+     }
+};
+
+DescriptorMap DESCRIPTOR_MAP;
+}
+
+std::string Descriptor::symbol() const {
+    switch (type) {
+      case Descriptor::NUMERIC: return DESCRIPTOR_MAP[value.code];
+      case Descriptor::SYMBOLIC: return value.symbol.str();
+    }
+    assert(0);
+    return std::string();
+}
+
+uint64_t Descriptor::code() const {
+    switch (type) {
+      case Descriptor::NUMERIC: return value.code;
+      case Descriptor::SYMBOLIC: return DESCRIPTOR_MAP[value.symbol.str()];
     }
-    return os;
+    assert(0);
+    return 0;
 }
+
+std::ostream& operator<<(std::ostream& os, const Descriptor& d) {
+    return os << d.symbol() << "(" << "0x" << std::hex << d.code() << ")";
+}
+
 }} // namespace qpid::amqp

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Descriptor.h 
Tue Mar  3 14:58:01 2015
@@ -49,6 +49,8 @@ struct Descriptor
     QPID_COMMON_EXTERN bool match(const std::string&, uint64_t) const;
     QPID_COMMON_EXTERN size_t getSize() const;
     QPID_COMMON_EXTERN Descriptor* nest(const Descriptor& d);
+    QPID_COMMON_EXTERN std::string symbol() const;
+    QPID_COMMON_EXTERN uint64_t code() const;
 };
 
 QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream& os, const 
Descriptor& d);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.cpp 
Tue Mar  3 14:58:01 2015
@@ -31,10 +31,17 @@
 #include <string.h>
 
 using namespace qpid::types::encodings;
+using qpid::types::Variant;
 
 namespace qpid {
 namespace amqp {
 
+Encoder::Overflow::Overflow() : Exception("Buffer overflow in encoder!") {}
+
+Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0), 
grow(false) {}
+
+Encoder::Encoder() : data(0), size(0), position(0), grow(true) {}
+
 namespace {
 template <typename T> size_t encode(char* data, T i);
 template <> size_t encode<uint8_t>(char* data, uint8_t i)
@@ -406,6 +413,18 @@ void Encoder::writeList(const std::list<
 
 void Encoder::writeValue(const qpid::types::Variant& value, const Descriptor* 
d)
 {
+    if (d) {
+        writeDescriptor(*d);    // Write this descriptor before any in the 
value.
+        d = 0;
+    }
+    // Write any descriptors attached to the value.
+    const Variant::List& descriptors = value.getDescriptors();
+    for (Variant::List::const_iterator i = descriptors.begin(); i != 
descriptors.end(); ++i) {
+        if (i->getType() == types::VAR_STRING)
+            writeDescriptor(Descriptor(CharSequence::create(i->asString())));
+        else
+            writeDescriptor(Descriptor(i->asUint64()));
+    }
     switch (value.getType()) {
       case qpid::types::VAR_VOID:
         writeNull(d);
@@ -477,18 +496,28 @@ void Encoder::writeDescriptor(const Desc
           break;
     }
 }
+
 void Encoder::check(size_t s)
 {
     if (position + s > size) {
-        QPID_LOG(notice, "Buffer overflow for write of size " << s << " to 
buffer of size " << size << " at position " << position);
-        assert(false);
-        throw qpid::Exception("Buffer overflow in encoder!");
+        if (grow) {
+            buffer.resize(buffer.size() + s);
+            data = const_cast<char*>(buffer.data());
+            size = buffer.size();
+        }
+        else {
+            QPID_LOG(notice, "Buffer overflow for write of size " << s
+                     << " to buffer of size " << size << " at position " << 
position);
+            assert(false);
+            throw Overflow();
+        }
     }
 }
-Encoder::Encoder(char* d, size_t s) : data(d), size(s), position(0) {}
+
 size_t Encoder::getPosition() { return position; }
 size_t Encoder::getSize() const { return size; }
 char* Encoder::getData() { return data + position; }
+std::string Encoder::getBuffer() { return buffer; }
 void Encoder::resetPosition(size_t p) { assert(p <= size); position = p; }
 
 }} // namespace qpid::amqp

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/Encoder.h Tue 
Mar  3 14:58:01 2015
@@ -23,6 +23,7 @@
  */
 #include "qpid/sys/IntegerTypes.h"
 #include "qpid/amqp/Constructor.h"
+#include "qpid/Exception.h"
 #include <list>
 #include <map>
 #include <stddef.h>
@@ -43,6 +44,18 @@ struct Descriptor;
 class Encoder
 {
   public:
+    struct Overflow : public Exception { Overflow(); };
+
+    /** Create an encoder that writes into the buffer at data up to size bytes.
+     * Write operations throw Overflow if encoding exceeds size bytes.
+     */
+    QPID_COMMON_EXTERN Encoder(char* data, size_t size);
+
+    /** Create an encoder that manages its own buffer. Buffer grows to 
accomodate
+     * all encoded data. Call getBuffer() to get the buffer.
+     */
+    QPID_COMMON_EXTERN Encoder();
+
     void writeCode(uint8_t);
 
     void write(bool);
@@ -100,19 +113,27 @@ class Encoder
     QPID_COMMON_EXTERN void writeList(const std::list<qpid::types::Variant>& 
value, const Descriptor* d=0, bool large=true);
 
     void writeDescriptor(const Descriptor&);
-    QPID_COMMON_EXTERN Encoder(char* data, size_t size);
     QPID_COMMON_EXTERN size_t getPosition();
     void resetPosition(size_t p);
     char* skip(size_t);
     void writeBytes(const char* bytes, size_t count);
     virtual ~Encoder() {}
+
+    /** Return the total size of the buffer. */
     size_t getSize() const;
-  protected:
+
+    /** Return the growable buffer. */
+    std::string getBuffer();
+
+    /** Return the unused portion of the buffer. */
     char* getData();
+
   private:
     char* data;
     size_t size;
     size_t position;
+    bool grow;
+    std::string buffer;
 
     void write(const CharSequence& v, std::pair<uint8_t, uint8_t> codes, const 
Descriptor* d);
     void write(const std::string& v, std::pair<uint8_t, uint8_t> codes, const 
Descriptor* d);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/amqp/descriptors.h 
Tue Mar  3 14:58:01 2015
@@ -26,6 +26,9 @@
 namespace qpid {
 namespace amqp {
 
+// NOTE: If you add descriptor symbols and codes here, you must also update 
the DescriptorMap
+// constructor in Descriptor.cpp.
+
 namespace message {
 const std::string HEADER_SYMBOL("amqp:header:list");
 const std::string PROPERTIES_SYMBOL("amqp:properties:list");
@@ -36,6 +39,7 @@ const std::string AMQP_SEQUENCE_SYMBOL("
 const std::string AMQP_VALUE_SYMBOL("amqp:amqp-value:*");
 const std::string DATA_SYMBOL("amqp:data:binary");
 const std::string FOOTER_SYMBOL("amqp:footer:map");
+const std::string ACCEPTED_SYMBOL("amqp:accepted:list");
 
 const uint64_t HEADER_CODE(0x70);
 const uint64_t DELIVERY_ANNOTATIONS_CODE(0x71);
@@ -46,6 +50,7 @@ const uint64_t DATA_CODE(0x75);
 const uint64_t AMQP_SEQUENCE_CODE(0x76);
 const uint64_t AMQP_VALUE_CODE(0x77);
 const uint64_t FOOTER_CODE(0x78);
+const uint64_t ACCEPTED_CODE(0x24);
 
 const Descriptor HEADER(HEADER_CODE);
 const Descriptor DELIVERY_ANNOTATIONS(DELIVERY_ANNOTATIONS_CODE);

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar  3 14:58:01 2015
@@ -7,4 +7,4 @@
 /qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker:1061302-1072333
 /qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker:1144319-1179855
 /qpid/branches/qpid-3890/qpid/cpp/src/qpid/broker:1299027-1303795
-/qpid/trunk/qpid/cpp/src/qpid/broker:1643238-1658732
+/qpid/trunk/qpid/cpp/src/qpid/broker:1643238-1663687

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/Queue.cpp 
Tue Mar  3 14:58:01 2015
@@ -297,6 +297,8 @@ void Queue::deliverTo(Message msg, TxBuf
         if (txn) {
             TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
             txn->enlist(op);
+            QPID_LOG(debug, "Message " << msg.getSequence() << " enqueue on " 
<< name
+                     << " enlisted in " << txn);
         } else {
             if (enqueue(0, msg)) {
                 push(msg);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
 Tue Mar  3 14:58:01 2015
@@ -165,8 +165,8 @@ Connection::~Connection()
 {
     if (ticker) ticker->cancel();
     getBroker().getConnectionObservers().closed(*this);
-    pn_transport_free(transport);
     pn_connection_free(connection);
+    pn_transport_free(transport);
 #ifdef HAVE_PROTON_EVENTS
     pn_collector_free(collector);
 #endif

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Exception.h 
Tue Mar  3 14:58:01 2015
@@ -22,6 +22,7 @@
  *
  */
 #include <string>
+#include <qpid/Exception.h>
 
 namespace qpid {
 namespace broker {
@@ -29,7 +30,7 @@ namespace amqp {
 /**
  * Exception to signal various AMQP 1.0 defined conditions
  */
-class Exception : public std::exception
+class Exception : public qpid::Exception
 {
   public:
     Exception(const std::string& name, const std::string& description);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
 Tue Mar  3 14:58:01 2015
@@ -100,6 +100,7 @@ namespace {
             boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> 
copy(new Transfer(delivery, session));
             return copy;
         }
+
       private:
         pn_delivery_t* delivery;
         boost::shared_ptr<Session> session;
@@ -146,8 +147,8 @@ void DecodingIncoming::deliver(boost::in
 {
     qpid::broker::Message message(received, received);
     userid.verify(message.getUserId());
-    handle(message, session.getTransaction(delivery));
     received->begin();
+    handle(message, session.getTransaction(delivery));
     Transfer t(delivery, sessionPtr);
     received->end(t);
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
 Tue Mar  3 14:58:01 2015
@@ -28,6 +28,7 @@
 #include "qpid/broker/TopicKeyNode.h"
 #include "qpid/sys/OutputControl.h"
 #include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/Descriptor.h"
 #include "qpid/amqp/MessageEncoder.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -90,13 +91,13 @@ bool OutgoingFromQueue::doWork()
                 return true;
             } else {
                 pn_link_drained(link);
-                QPID_LOG(debug, "No message available on " << 
queue->getName());
+                QPID_LOG(trace, "No message available on " << 
queue->getName());
             }
         } catch (const qpid::framing::ResourceDeletedException& e) {
             throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, 
e.what());
         }
     } else {
-        QPID_LOG(debug, "Can't deliver to " << getName() << " from " << 
queue->getName() << ": " << pn_link_credit(link));
+        QPID_LOG(trace, "Can't deliver to " << getName() << " from " << 
queue->getName() << ": " << pn_link_credit(link));
     }
     return false;
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp 
Tue Mar  3 14:58:01 2015
@@ -61,6 +61,8 @@ namespace qpid {
 namespace broker {
 namespace amqp {
 
+using namespace qpid::amqp::transaction;
+
 namespace {
 pn_bytes_t convert(const std::string& s)
 {
@@ -209,6 +211,7 @@ class IncomingToCoordinator : public Dec
   public:
     IncomingToCoordinator(pn_link_t* link, Broker& broker, Session& parent)
         : DecodingIncoming(link, broker, parent, std::string(), "txn-ctrl", 
pn_link_name(link)) {}
+
     ~IncomingToCoordinator() { session.abort(); }
     void deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>, 
pn_delivery_t*);
     void handle(qpid::broker::Message&, qpid::broker::TxBuffer*) {}
@@ -218,7 +221,9 @@ class IncomingToCoordinator : public Dec
 Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o)
     : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), 
session(s), connection(c), out(o), deleted(false),
       authorise(connection.getUserId(), connection.getBroker().getAcl()),
-      detachRequested(), txnId((boost::format("%1%") % s).str()) {}
+      detachRequested(),
+      tx(*this)
+{}
 
 
 Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* 
terminus, bool incoming)
@@ -383,6 +388,7 @@ void Session::attach(pn_link_t* link)
         //i.e a subscription
         std::string name;
         if (pn_terminus_get_type(source) == PN_UNSPECIFIED) {
+            pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
             throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, 
"No source specified!");
         } else if (pn_terminus_is_dynamic(source)) {
             name = generateName(link);
@@ -399,6 +405,7 @@ void Session::attach(pn_link_t* link)
         pn_terminus_t* target = pn_link_remote_target(link);
         std::string name;
         if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
+            pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
             throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, 
"No target specified!");
         } else if (pn_terminus_get_type(target) == PN_COORDINATOR) {
             QPID_LOG(debug, "Received attach request for incoming link to 
transaction coordinator on " << this);
@@ -634,11 +641,12 @@ void Session::readable(pn_link_t* link,
         if (target->second->haveWork()) out.activateOutput();
     }
 }
+
 void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
 {
     OutgoingLinks::iterator sender = outgoing.find(link);
     if (sender == outgoing.end()) {
-        QPID_LOG(error, "Delivery returned for unknown link");
+        QPID_LOG(error, "Delivery returned for unknown link " << 
pn_link_name(link));
     } else {
         sender->second->handle(delivery);
     }
@@ -647,7 +655,7 @@ void Session::writable(pn_link_t* link,
 bool Session::dispatch()
 {
     bool output(false);
-    if (commitPending.boolCompareAndSwap(true, false)) {
+    if (tx.commitPending.boolCompareAndSwap(true, false)) {
         committed(true);
     }
     for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end();) {
@@ -735,7 +743,7 @@ void Session::detachedByManagement()
 
 TxBuffer* Session::getTransaction(const std::string& id)
 {
-    return  (txn.get() && id == txnId) ? txn.get() : 0;
+    return  (tx.buffer.get() && id == tx.id) ? tx.buffer.get() : 0;
 }
 
 TxBuffer* Session::getTransaction(pn_delivery_t* delivery)
@@ -746,42 +754,41 @@ TxBuffer* Session::getTransaction(pn_del
 std::pair<TxBuffer*,uint64_t> Session::getTransactionalState(pn_delivery_t* 
delivery)
 {
     std::pair<TxBuffer*,uint64_t> result((TxBuffer*)0, 0);
-    if (pn_delivery_remote_state(delivery) == 
qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE) {
+    if (pn_delivery_remote_state(delivery) == TRANSACTIONAL_STATE_CODE) {
         pn_data_t* data = pn_disposition_data(pn_delivery_remote(delivery));
-        if (data && pn_data_next(data)) {
-            size_t count = pn_data_get_list(data);
-            if (count > 0) {
+        pn_data_rewind(data);
+        size_t count = 0;
+        if (data && pn_data_next(data) && (count = pn_data_get_list(data)) > 
0) {
+            pn_data_enter(data);
+            pn_data_next(data);
+            std::string id = convert(pn_data_get_binary(data));
+            result.first = getTransaction(id);
+            if (!result.first) {
+                QPID_LOG(error, "Transaction not found for id: " << id);
+            }
+            if (count > 1 && pn_data_next(data)) {
                 pn_data_enter(data);
                 pn_data_next(data);
-                std::string id = convert(pn_data_get_binary(data));
-                result.first = getTransaction(id);
-                if (!result.first) {
-                    QPID_LOG(error, "Transaction not found for id: " << id);
-                }
-                if (count > 1 && pn_data_next(data) && 
pn_data_is_described(data)) {
-                    pn_data_enter(data);
-                    pn_data_next(data);
-                    result.second = pn_data_get_ulong(data);
-                }
-                pn_data_exit(data);
+                result.second = pn_data_get_ulong(data);
             }
-        } else {
-            QPID_LOG(error, "Transactional delivery " << delivery << " appears 
to have no data");
         }
+        else
+            QPID_LOG(error, "Transactional delivery " << delivery << " appears 
to have no data");
     }
     return result;
 }
 
 std::string Session::declare()
 {
-    if (txn.get()) {
+    if (tx.buffer.get()) {
         //not sure what the error code should be; none in spec really fit well.
-        throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, 
"Session only supports one transaction active at a time");
+        throw Exception(qpid::amqp::error_conditions::transaction::ROLLBACK,
+                        "Session only supports one transaction active at a 
time");
     }
-    txn = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
-    connection.getBroker().getBrokerObservers().startTx(txn);
+    tx.buffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
+    connection.getBroker().getBrokerObservers().startTx(tx.buffer);
     txStarted();
-    return txnId;
+    return tx.id;
 }
 
 namespace {
@@ -795,32 +802,41 @@ namespace {
             boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> 
copy(new AsyncCommit(session));
             return copy;
         }
+
       private:
         boost::shared_ptr<Session> session;
     };
 }
 
-void Session::discharge(const std::string& id, bool failed)
+void Session::discharge(const std::string& id, bool failed, pn_delivery_t* 
delivery)
 {
-    if (!txn.get() || id != txnId) {
-        throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID, 
"No transaction declared with that id");
+    QPID_LOG(debug, "Coordinator " <<  (failed ? " rollback" : " commit")
+             << " transaction " << id);
+    if (!tx.buffer.get() || id != tx.id) {
+        throw Exception(qpid::amqp::error_conditions::transaction::UNKNOWN_ID,
+                        Msg() << "Cannot discharge transaction " << id
+                        << (tx.buffer.get() ? Msg() << ", current transaction 
is " << tx.id :
+                            Msg() << ", no current transaction"));
     }
+    tx.discharge = delivery;
     if (failed) {
         abort();
     } else {
-        txn->begin();
-        txn->startCommit(&connection.getBroker().getStore());
+        tx.buffer->begin();
+        tx.buffer->startCommit(&connection.getBroker().getStore());
         AsyncCommit callback(shared_from_this());
-        txn->end(callback);
+        tx.buffer->end(callback);
     }
 }
 
 void Session::abort()
 {
-    if (txn) {
-        txn->rollback();
+    if (tx.buffer) {
+        tx.dischargeComplete();
+        tx.buffer->rollback();
         txAborted();
-        txn = boost::intrusive_ptr<TxBuffer>();
+        tx.buffer.reset();
+        QPID_LOG(debug, "Transaction " << tx.id << " rolled back");
     }
 }
 
@@ -828,16 +844,18 @@ void Session::committed(bool sync)
 {
     if (sync) {
         //this is on IO thread
-        if (txn.get()) {
-            txn->endCommit(&connection.getBroker().getStore());
+        tx.dischargeComplete();
+        if (tx.buffer.get()) {
+            tx.buffer->endCommit(&connection.getBroker().getStore());
             txCommitted();
-            txn = boost::intrusive_ptr<TxBuffer>();
+            tx.buffer.reset();
+            QPID_LOG(debug, "Transaction " << tx.id << " comitted");
         } else {
             throw 
Exception(qpid::amqp::error_conditions::transaction::ROLLBACK, "tranaction 
vanished during async commit");
         }
     } else {
         //this is not on IO thread, need to delay processing until on IO thread
-        if (commitPending.boolCompareAndSwap(false, true)) {
+        if (tx.commitPending.boolCompareAndSwap(false, true)) {
             qpid::sys::Mutex::ScopedLock l(lock);
             if (!deleted) {
                 out.activateOutput();
@@ -878,7 +896,7 @@ void IncomingToCoordinator::deliver(boos
 {
     if (message && message->isTypedBody()) {
         QPID_LOG(debug, "Coordinator got message: @" << 
message->getBodyDescriptor() << " " << message->getTypedBody());
-        if 
(message->getBodyDescriptor().match(qpid::amqp::transaction::DECLARE_SYMBOL, 
qpid::amqp::transaction::DECLARE_CODE)) {
+        if (message->getBodyDescriptor().match(DECLARE_SYMBOL, DECLARE_CODE)) {
             std::string id = session.declare();
             //encode the txn id in a 'declared' list on the disposition
             pn_data_t* data = pn_disposition_data(pn_delivery_local(delivery));
@@ -887,22 +905,38 @@ void IncomingToCoordinator::deliver(boos
             pn_data_put_binary(data, convert(id));
             pn_data_exit(data);
             pn_data_exit(data);
-            pn_delivery_update(delivery, 
qpid::amqp::transaction::DECLARED_CODE);
+            pn_delivery_update(delivery, DECLARED_CODE);
             pn_delivery_settle(delivery);
             session.incomingMessageAccepted();
-        } else if 
(message->getBodyDescriptor().match(qpid::amqp::transaction::DISCHARGE_SYMBOL, 
qpid::amqp::transaction::DISCHARGE_CODE)) {
+            QPID_LOG(debug, "Coordinator declared transaction " << id);
+        } else if (message->getBodyDescriptor().match(DISCHARGE_SYMBOL, 
DISCHARGE_CODE)) {
             if (message->getTypedBody().getType() == qpid::types::VAR_LIST) {
                 qpid::types::Variant::List args = 
message->getTypedBody().asList();
                 qpid::types::Variant::List::const_iterator i = args.begin();
                 if (i != args.end()) {
                     std::string id = *i;
                     bool failed = ++i != args.end() ? i->asBool() : false;
-                    session.discharge(id, failed);
-                    DecodingIncoming::deliver(message, delivery);//ensures 
async completion of commit is taken care of
+                    session.discharge(id, failed, delivery);
                 }
+
+            } else {
+                throw framing::IllegalArgumentException(
+                    Msg() << "Coordinator unknown message: @" <<
+                    message->getBodyDescriptor() << " " << 
message->getTypedBody());
             }
         }
     }
 }
 
+Session::Transaction::Transaction(Session& s) :
+    session(s), id((boost::format("%1%") % &s).str()), discharge(0) {}
+
+// Called in IO thread to signal completion of dischage by settling discharge 
message.
+void Session::Transaction::dischargeComplete() {
+    if (buffer.get() && discharge) {
+        session.accepted(discharge, false); // Queue up accept and activate 
output.
+        discharge = 0;
+    }
+}
+
 }}} // namespace qpid::broker::amqp

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.h 
Tue Mar  3 14:58:01 2015
@@ -91,7 +91,7 @@ class Session : public ManagedSession, p
     std::pair<TxBuffer*,uint64_t> getTransactionalState(pn_delivery_t*);
     //transaction coordination:
     std::string declare();
-    void discharge(const std::string& id, bool failed);
+    void discharge(const std::string& id, bool failed, pn_delivery_t*);
     void abort();
   protected:
     void detachedByManagement();
@@ -109,9 +109,18 @@ class Session : public ManagedSession, p
     std::set< boost::shared_ptr<Queue> > exclusiveQueues;
     Authorise authorise;
     bool detachRequested;
-    boost::intrusive_ptr<TxBuffer> txn;
-    std::string txnId;
-    qpid::sys::AtomicValue<bool> commitPending;
+
+    struct Transaction {
+        Transaction(Session&);
+        void dischargeComplete();
+
+        Session& session;
+        boost::intrusive_ptr<TxBuffer> buffer;
+        std::string id;
+        qpid::sys::AtomicValue<bool> commitPending;
+        pn_delivery_t* discharge;
+    };
+    Transaction tx;
 
     struct ResolvedNode
     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
 Tue Mar  3 14:58:01 2015
@@ -205,7 +205,7 @@ void ConnectionHandler::fail(const std::
 {
     errorCode = CLOSE_CODE_FRAMING_ERROR;
     errorText = message;
-    QPID_LOG(warning, message);
+    QPID_LOG(debug, message);
     setState(FAILED);
 }
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
 Tue Mar  3 14:58:01 2015
@@ -510,9 +510,9 @@ void AddressHelper::checkAssertion(pn_te
                             requested.erase(j->first);
                         }
                     } else if (key == AUTO_DELETE) {
-                        PnData(data).read(v);
+                        PnData(data).get(v);
                         isAutoDeleted = v.asBool();
-                    } else if (j != requested.end() && (PnData(data).read(v) 
&& v.asString() == j->second.asString())) {
+                    } else if (j != requested.end() && (PnData(data).get(v) && 
v.asString() == j->second.asString())) {
                         requested.erase(j->first);
                     }
                 }
@@ -646,7 +646,7 @@ void AddressHelper::configure(pn_link_t*
                 } else {
                     pn_data_put_ulong(filter, i->descriptorCode);
                 }
-                PnData(filter).write(i->value);
+                PnData(filter).put(i->value);
                 pn_data_exit(filter);
             }
             pn_data_exit(filter);
@@ -733,7 +733,7 @@ void AddressHelper::setNodeProperties(pn
                 putLifetimePolicy(data, 
toLifetimePolicy(i->second.asString()));
             } else {
                 pn_data_put_symbol(data, convert(i->first));
-                PnData(data).write(i->second);
+                PnData(data).put(i->second);
             }
         }
         pn_data_exit(data);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
 Tue Mar  3 14:58:01 2015
@@ -25,8 +25,11 @@
 #include "Sasl.h"
 #include "SenderContext.h"
 #include "SessionContext.h"
+#include "Transaction.h"
 #include "Transport.h"
 #include "qpid/amqp/descriptors.h"
+#include "qpid/amqp/Encoder.h"
+#include "qpid/amqp/Descriptor.h"
 #include "qpid/messaging/exceptions.h"
 #include "qpid/messaging/AddressImpl.h"
 #include "qpid/messaging/Duration.h"
@@ -43,6 +46,7 @@
 #include "qpid/sys/urlAdd.h"
 #include "config.h"
 #include <boost/lexical_cast.hpp>
+#include <boost/bind.hpp>
 #include <vector>
 extern "C" {
 #include <proton/engine.h>
@@ -151,20 +155,23 @@ ConnectionContext::~ConnectionContext()
     if (ticker) ticker->cancel();
     close();
     sessions.clear();
-    pn_transport_free(engine);
     pn_connection_free(connection);
+    pn_transport_free(engine);
 }
 
 bool ConnectionContext::isOpen() const
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     return state == CONNECTED && pn_connection_state(connection) & 
(PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
 }
 
 void ConnectionContext::sync(boost::shared_ptr<SessionContext> ssn)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    //wait for outstanding sends to settle
+    sys::Monitor::ScopedLock l(lock);
+    syncLH(ssn, l);
+}
+
+void ConnectionContext::syncLH(boost::shared_ptr<SessionContext> ssn, 
sys::Monitor::ScopedLock&) {
     while (!ssn->settled()) {
         QPID_LOG(debug, "Waiting for sends to settle on sync()");
         wait(ssn);//wait until message has been confirmed
@@ -175,18 +182,13 @@ void ConnectionContext::sync(boost::shar
 
 void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
         //explicitly release messages that have yet to be fetched
         for (SessionContext::ReceiverMap::iterator i = ssn->receivers.begin(); 
i != ssn->receivers.end(); ++i) {
             drain_and_release_messages(ssn, i->second);
         }
-        //wait for outstanding sends to settle
-        while (!ssn->settled()) {
-            QPID_LOG(debug, "Waiting for sends to settle before closing");
-            wait(ssn);//wait until message has been confirmed
-            wakeupDriver();
-        }
+        syncLH(ssn, l);
     }
 
     if (pn_session_state(ssn->session) & PN_REMOTE_ACTIVE) {
@@ -199,17 +201,11 @@ void ConnectionContext::endSession(boost
 
 void ConnectionContext::close()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     if (state != CONNECTED) return;
     if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
         for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); 
++i) {
-            //wait for outstanding sends to settle
-            while (!i->second->settled()) {
-                QPID_LOG(debug, "Waiting for sends to settle before closing");
-                wait(i->second);//wait until message has been confirmed
-            }
-
-
+            syncLH(i->second, l);
             if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
                 pn_session_close(i->second->session);
             }
@@ -246,7 +242,7 @@ bool ConnectionContext::fetch(boost::sha
      */
     qpid::sys::AtomicCount::ScopedIncrement track(lnk->fetching);
     {
-        qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+        sys::Monitor::ScopedLock l(lock);
         checkClosed(ssn, lnk);
         if (!lnk->capacity) {
             pn_link_flow(lnk->receiver, 1);
@@ -257,10 +253,10 @@ bool ConnectionContext::fetch(boost::sha
         return true;
     } else {
         {
-            qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+            sys::Monitor::ScopedLock l(lock);
             pn_link_drain(lnk->receiver, 0);
             wakeupDriver();
-            while (pn_link_credit(lnk->receiver) && 
!pn_link_queued(lnk->receiver)) {
+            while (pn_link_draining(lnk->receiver) && 
!pn_link_queued(lnk->receiver)) {
                 QPID_LOG(debug, "Waiting for message or for credit to be 
drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << 
pn_link_queued(lnk->receiver));
                 wait(ssn, lnk);
             }
@@ -269,7 +265,7 @@ bool ConnectionContext::fetch(boost::sha
             }
         }
         if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
-            qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+            sys::Monitor::ScopedLock l(lock);
             if (lnk->capacity) {
                 pn_link_flow(lnk->receiver, 1);
                 wakeupDriver();
@@ -296,7 +292,7 @@ bool ConnectionContext::get(boost::share
 {
     qpid::sys::AbsTime until(convert(timeout));
     while (true) {
-        qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+        sys::Monitor::ScopedLock l(lock);
         checkClosed(ssn, lnk);
         pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
         QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
@@ -320,6 +316,9 @@ bool ConnectionContext::get(boost::share
                     haveOutput = true;
                 }
             }
+            // Automatically ack messages if we are in a transaction.
+            if (ssn->transaction)
+                acknowledgeLH(ssn, &message, false, l);
             return true;
         } else if (until > qpid::sys::now()) {
             waitUntil(ssn, lnk, until);
@@ -334,7 +333,7 @@ boost::shared_ptr<ReceiverContext> Conne
 {
     qpid::sys::AbsTime until(convert(timeout));
     while (true) {
-        qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+        sys::Monitor::ScopedLock l(lock);
         checkClosed(ssn);
         boost::shared_ptr<ReceiverContext> r = ssn->nextReceiver();
         if (r) {
@@ -347,9 +346,13 @@ boost::shared_ptr<ReceiverContext> Conne
     }
 }
 
-void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, 
qpid::messaging::Message* message, bool cumulative)
+void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, 
qpid::messaging::Message* message, bool cumulative) {
+    sys::Monitor::ScopedLock l(lock);
+    acknowledgeLH(ssn, message, cumulative, l);
+}
+
+void ConnectionContext::acknowledgeLH(boost::shared_ptr<SessionContext> ssn, 
qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     checkClosed(ssn);
     if (message) {
         ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), 
cumulative);
@@ -361,7 +364,7 @@ void ConnectionContext::acknowledge(boos
 
 void ConnectionContext::nack(boost::shared_ptr<SessionContext> ssn, 
qpid::messaging::Message& message, bool reject)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     checkClosed(ssn);
     ssn->nack(MessageImplAccess::get(message).getInternalId(), reject);
     wakeupDriver();
@@ -369,7 +372,7 @@ void ConnectionContext::nack(boost::shar
 
 void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<SenderContext> lnk)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     if (pn_link_state(lnk->sender) & PN_LOCAL_ACTIVE) {
         lnk->close();
     }
@@ -401,7 +404,7 @@ void ConnectionContext::drain_and_releas
 
 void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<ReceiverContext> lnk)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     drain_and_release_messages(ssn, lnk);
     if (pn_link_state(lnk->receiver) & PN_LOCAL_ACTIVE) {
         lnk->close();
@@ -415,7 +418,7 @@ void ConnectionContext::detach(boost::sh
 
 void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<SenderContext> lnk)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     lnk->configure();
     attach(ssn, lnk->sender);
     checkClosed(ssn, lnk);
@@ -425,7 +428,7 @@ void ConnectionContext::attach(boost::sh
 
 void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<ReceiverContext> lnk)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     lnk->configure();
     attach(ssn, lnk->receiver, lnk->capacity);
     checkClosed(ssn, lnk);
@@ -445,11 +448,26 @@ void ConnectionContext::attach(boost::sh
     }
 }
 
-void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, 
bool sync)
+void ConnectionContext::send(
+    boost::shared_ptr<SessionContext> ssn,
+    boost::shared_ptr<SenderContext> snd,
+    const qpid::messaging::Message& message,
+    bool sync,
+    SenderContext::Delivery** delivery)
+{
+    sys::Monitor::ScopedLock l(lock);
+    sendLH(ssn, snd, message, sync, delivery, l);
+}
+
+void ConnectionContext::sendLH(
+    boost::shared_ptr<SessionContext> ssn,
+    boost::shared_ptr<SenderContext> snd,
+    const qpid::messaging::Message& message,
+    bool sync,
+    SenderContext::Delivery** delivery,
+    sys::Monitor::ScopedLock&)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     checkClosed(ssn);
-    SenderContext::Delivery* delivery(0);
     while (pn_transport_pending(engine) > 65536) {
         QPID_LOG(debug, "Have " << pn_transport_pending(engine) << " bytes of 
output pending; waiting for this to be written...");
         notifyOnWrite = true;
@@ -457,17 +475,17 @@ void ConnectionContext::send(boost::shar
         wait(ssn, snd);
         notifyOnWrite = false;
     }
-    while (!snd->send(message, &delivery)) {
+    while (!snd->send(message, delivery)) {
         QPID_LOG(debug, "Waiting for capacity...");
         wait(ssn, snd);//wait for capacity
     }
     wakeupDriver();
-    if (sync && delivery) {
-        while (!delivery->delivered()) {
+    if (sync && *delivery) {
+        while (!(*delivery)->delivered()) {
             QPID_LOG(debug, "Waiting for confirmation...");
             wait(ssn, snd);//wait until message has been confirmed
         }
-        if (delivery->rejected()) {
+        if ((*delivery)->rejected()) {
             throw MessageRejected("Message was rejected by peer");
         }
 
@@ -476,46 +494,46 @@ void ConnectionContext::send(boost::shar
 
 void ConnectionContext::setCapacity(boost::shared_ptr<SenderContext> sender, 
uint32_t capacity)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     sender->setCapacity(capacity);
 }
 uint32_t ConnectionContext::getCapacity(boost::shared_ptr<SenderContext> 
sender)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     return sender->getCapacity();
 }
 uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<SenderContext> 
sender)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     return sender->getUnsettled();
 }
 
 void ConnectionContext::setCapacity(boost::shared_ptr<ReceiverContext> 
receiver, uint32_t capacity)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     receiver->setCapacity(capacity);
     pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity());
     wakeupDriver();
 }
 uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> 
receiver)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     return receiver->getCapacity();
 }
 uint32_t ConnectionContext::getAvailable(boost::shared_ptr<ReceiverContext> 
receiver)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     return receiver->getAvailable();
 }
 uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> 
receiver)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     return receiver->getUnsettled();
 }
 
 void ConnectionContext::activateOutput()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     if (state == CONNECTED) wakeupDriver();
 }
 /**
@@ -543,8 +561,8 @@ pn_state_t IS_CLOSED = PN_LOCAL_CLOSED |
 
 void ConnectionContext::reset()
 {
-    pn_transport_free(engine);
     pn_connection_free(connection);
+    pn_transport_free(engine);
 
     engine = pn_transport();
     connection = pn_connection();
@@ -555,7 +573,7 @@ void ConnectionContext::reset()
     }
 }
 
-void ConnectionContext::check() {
+bool ConnectionContext::check() {
     if (checkDisconnected()) {
         if (ConnectionOptions::reconnect) {
             QPID_LOG(notice, "Auto-reconnecting to " << fullUrl);
@@ -564,7 +582,9 @@ void ConnectionContext::check() {
         } else {
             throw qpid::messaging::TransportFailure("Disconnected (reconnect 
disabled)");
         }
+        return true;
     }
+    return false;
 }
 
 bool ConnectionContext::checkDisconnected() {
@@ -588,7 +608,7 @@ bool ConnectionContext::checkDisconnecte
 
 void ConnectionContext::wait()
 {
-    check();
+    if (check()) return;        // Reconnected, may need to re-test condition.
     lock.wait();
     check();
 }
@@ -630,6 +650,7 @@ void ConnectionContext::waitUntil(boost:
 void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
 {
     check();
+    ssn->error.raise();
     if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
         pn_condition_t* error = pn_session_remote_condition(ssn->session);
         std::stringstream text;
@@ -690,6 +711,7 @@ void ConnectionContext::checkClosed(boos
 
 void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
 {
+    if (s->error) return;
     pn_session_open(s->session);
     wakeupDriver();
     while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
@@ -718,26 +740,31 @@ void ConnectionContext::restartSession(b
 
 boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool 
transactional, const std::string& n)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    if (transactional) throw qpid::messaging::MessagingException("Transactions 
not yet supported");
+    boost::shared_ptr<SessionContext> session;
     std::string name = n.empty() ? qpid::framing::Uuid(true).str() : n;
-    SessionMap::const_iterator i = sessions.find(name);
-    if (i == sessions.end()) {
-        boost::shared_ptr<SessionContext> s(new SessionContext(connection));
-        s->setName(name);
-        s->session = pn_session(connection);
-        pn_session_open(s->session);
-        wakeupDriver();
-        while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
-            wait();
+    {
+        sys::Monitor::ScopedLock l(lock);
+        SessionMap::const_iterator i = sessions.find(name);
+        if (i == sessions.end()) {
+            session = boost::shared_ptr<SessionContext>(new 
SessionContext(connection));
+            session->setName(name);
+            pn_session_open(session->session);
+            wakeupDriver();
+            sessions[name] = session; // Add it now so it will be restarted if 
we reconnect in wait()
+            while (pn_session_state(session->session) & PN_REMOTE_UNINIT) {
+                wait();
+            }
+        } else {
+            throw qpid::messaging::KeyError(std::string("Session already 
exists: ") + name);
         }
-        sessions[name] = s;
-        return s;
-    } else {
-        throw qpid::messaging::KeyError(std::string("Session already exists: 
") + name);
-    }
 
+    }
+    if (transactional) {        // Outside of lock
+        startTxSession(session);
+    }
+    return session;
 }
+
 boost::shared_ptr<SessionContext> ConnectionContext::getSession(const 
std::string& name) const
 {
     SessionMap::const_iterator i = sessions.find(name);
@@ -760,7 +787,7 @@ std::string ConnectionContext::getAuthen
 
 std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t 
size)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     QPID_LOG(trace, id << " decode(" << size << ")");
     if (readHeader) {
         size_t decoded = readProtocolHeader(buffer, size);
@@ -805,7 +832,7 @@ std::size_t ConnectionContext::decodePla
 }
 std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     QPID_LOG(trace, id << " encode(" << size << ")");
     if (writeHeader) {
         size_t encoded = writeProtocolHeader(buffer, size);
@@ -843,19 +870,19 @@ std::size_t ConnectionContext::encodePla
 }
 bool ConnectionContext::canEncodePlain()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / 
qpid::sys::TIME_MSEC);
     return haveOutput && state == CONNECTED;
 }
 void ConnectionContext::closed()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     state = DISCONNECTED;
     lock.notifyAll();
 }
 void ConnectionContext::opened()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     state = CONNECTED;
     lock.notifyAll();
 }
@@ -921,7 +948,7 @@ const qpid::messaging::ConnectionOptions
 
 std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     size_t decoded = 0;
     try {
         if (sasl.get() && !sasl->authenticated()) {
@@ -939,7 +966,7 @@ std::size_t ConnectionContext::decode(co
 }
 std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     size_t encoded = 0;
     try {
         if (sasl.get() && sasl->canEncode()) {
@@ -957,7 +984,7 @@ std::size_t ConnectionContext::encode(ch
 }
 bool ConnectionContext::canEncode()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     if (sasl.get()) {
         try {
             if (sasl->canEncode()) return true;
@@ -978,26 +1005,21 @@ const std::string CLIENT_PPID("qpid.clie
 }
 void ConnectionContext::setProperties()
 {
-    pn_data_t* data = pn_connection_properties(connection);
-    pn_data_put_map(data);
-    pn_data_enter(data);
-
-    pn_data_put_symbol(data, PnData::str(CLIENT_PROCESS_NAME));
-    std::string processName = sys::SystemInfo::getProcessName();
-    pn_data_put_string(data, PnData::str(processName));
-
-    pn_data_put_symbol(data, PnData::str(CLIENT_PID));
-    pn_data_put_int(data, sys::SystemInfo::getProcessId());
-
-    pn_data_put_symbol(data, PnData::str(CLIENT_PPID));
-    pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
-
+    PnData data(pn_connection_properties(connection));
+    pn_data_put_map(data.data);
+    pn_data_enter(data.data);
+    data.putSymbol(CLIENT_PROCESS_NAME);
+    data.putSymbol(sys::SystemInfo::getProcessName());
+    data.putSymbol(CLIENT_PID);
+    data.put(int32_t(sys::SystemInfo::getProcessId()));
+    data.putSymbol(CLIENT_PPID);
+    data.put(int32_t(sys::SystemInfo::getParentProcessId()));
     for (Variant::Map::const_iterator i = properties.begin(); i != 
properties.end(); ++i)
     {
-        pn_data_put_symbol(data, PnData::str(i->first));
-        PnData(data).write(i->second);
+        data.putSymbol(i->first);
+        data.put(i->second);
     }
-    pn_data_exit(data);
+    pn_data_exit(data.data);
 }
 
 const qpid::sys::SecuritySettings* 
ConnectionContext::getTransportSecuritySettings()
@@ -1007,7 +1029,7 @@ const qpid::sys::SecuritySettings* Conne
 
 void ConnectionContext::open()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     if (state != DISCONNECTED) throw 
qpid::messaging::ConnectionError("Connection was already opened!");
     if (!driver) driver = DriverImpl::getDefault();
     QPID_LOG(info, "Starting connection to " << fullUrl);
@@ -1049,7 +1071,7 @@ void ConnectionContext::autoconnect()
 
 void ConnectionContext::reconnect(const Url& url) {
     QPID_LOG(notice, "Reconnecting to " << url);
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     if (state != DISCONNECTED) throw 
qpid::messaging::ConnectionError("Connection was already opened!");
     if (!driver) driver = DriverImpl::getDefault();
     reset();
@@ -1137,7 +1159,7 @@ bool ConnectionContext::tryOpenAddr(cons
 
 std::string ConnectionContext::getUrl() const
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+    sys::Monitor::ScopedLock l(lock);
     return (state == CONNECTED) ? currentUrl.str() : std::string();
 }
 
@@ -1209,6 +1231,40 @@ bool ConnectionContext::CodecAdapter::ca
     return context.canEncodePlain();
 }
 
+void ConnectionContext::startTxSession(boost::shared_ptr<SessionContext> 
session) {
+    try {
+        QPID_LOG(debug, id << " attaching transaction for " << 
session->getName());
+        boost::shared_ptr<Transaction> tx(new Transaction(session->session));
+        session->transaction = tx;
+        attach(session, tx);
+        tx->declare(boost::bind(&ConnectionContext::send, this, _1, _2, _3, 
_4, _5), session);
+    } catch (const Exception& e) {
+        throw TransactionError(Msg() << "Cannot start transaction: " << 
e.what());
+    }
+}
+
+void ConnectionContext::discharge(boost::shared_ptr<SessionContext> session, 
bool fail) {
+    {
+        sys::Monitor::ScopedLock l(lock);
+        checkClosed(session);
+        if (!session->transaction)
+            throw TransactionError("No Transaction");
+        Transaction::SendFunction sendFn = boost::bind(
+            &ConnectionContext::sendLH, this, _1, _2, _3, _4, _5, 
boost::ref(l));
+        syncLH(session, boost::ref(l)); // Sync to make sure all tx transfers 
have been received.
+        session->transaction->discharge(sendFn, session, fail);
+        session->transaction->declare(sendFn, session);
+    }
+}
+
+void ConnectionContext::commit(boost::shared_ptr<SessionContext> session) {
+    discharge(session, false);
+}
+
+void ConnectionContext::rollback(boost::shared_ptr<SessionContext> session) {
+    discharge(session, true);
+}
+
 
 // setup the transport and connection objects:
 void ConnectionContext::configureConnection()

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
 Tue Mar  3 14:58:01 2015
@@ -34,6 +34,7 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/types/Variant.h"
 #include "qpid/messaging/amqp/TransportContext.h"
+#include "SenderContext.h"
 
 struct pn_connection_t;
 struct pn_link_t;
@@ -59,7 +60,6 @@ class DriverImpl;
 class ReceiverContext;
 class Sasl;
 class SessionContext;
-class SenderContext;
 class Transport;
 
 /**
@@ -82,10 +82,20 @@ class ConnectionContext : public qpid::s
     void detach(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<ReceiverContext>);
     void drain_and_release_messages(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<ReceiverContext>);
     bool isClosed(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<ReceiverContext>);
-    void send(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<SenderContext> ctxt, const qpid::messaging::Message& message, 
bool sync);
+
+    // Link operations
+    void send(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<SenderContext> ctxt,
+              const qpid::messaging::Message& message, bool sync,
+              SenderContext::Delivery** delivery);
+
     bool fetch(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, 
qpid::messaging::Duration timeout);
     bool get(boost::shared_ptr<SessionContext> ssn, 
boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, 
qpid::messaging::Duration timeout);
+
+    // Session operations
     void acknowledge(boost::shared_ptr<SessionContext> ssn, 
qpid::messaging::Message* message, bool cumulative);
+    void commit(boost::shared_ptr<SessionContext> ssn);
+    void rollback(boost::shared_ptr<SessionContext> ssn);
+
     void nack(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message& 
message, bool reject);
     void sync(boost::shared_ptr<SessionContext> ssn);
     boost::shared_ptr<ReceiverContext> 
nextReceiver(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Duration 
timeout);
@@ -93,10 +103,10 @@ class ConnectionContext : public qpid::s
     void setOption(const std::string& name, const qpid::types::Variant& value);
     std::string getAuthenticatedUsername();
 
+    // Link operations
     void setCapacity(boost::shared_ptr<SenderContext>, uint32_t);
     uint32_t getCapacity(boost::shared_ptr<SenderContext>);
     uint32_t getUnsettled(boost::shared_ptr<SenderContext>);
-
     void setCapacity(boost::shared_ptr<ReceiverContext>, uint32_t);
     uint32_t getCapacity(boost::shared_ptr<ReceiverContext>);
     uint32_t getAvailable(boost::shared_ptr<ReceiverContext>);
@@ -159,9 +169,12 @@ class ConnectionContext : public qpid::s
     bool notifyOnWrite;
     boost::intrusive_ptr<qpid::sys::TimerTask> ticker;
 
-    void check();
+    bool check();
     bool checkDisconnected();
     void waitNoReconnect();
+
+    // NOTE: All wait*() functions must be called in a loop that checks for the
+    // waited condition with the lock held.
     void wait();
     void waitUntil(qpid::sys::AbsTime until);
     void wait(boost::shared_ptr<SessionContext>);
@@ -170,10 +183,12 @@ class ConnectionContext : public qpid::s
     void wait(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<SenderContext>);
     void waitUntil(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<ReceiverContext>, qpid::sys::AbsTime until);
     void waitUntil(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<SenderContext>, qpid::sys::AbsTime until);
+
     void checkClosed(boost::shared_ptr<SessionContext>);
     void checkClosed(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<ReceiverContext>);
     void checkClosed(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<SenderContext>);
     void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
+
     void wakeupDriver();
     void attach(boost::shared_ptr<SessionContext>, pn_link_t*, int credit=0);
     void autoconnect();
@@ -194,8 +209,18 @@ class ConnectionContext : public qpid::s
     std::string getError();
     bool useSasl();
     void setProperties();
+
     void configureConnection();
     bool checkTransportError(std::string&);
+
+    void discharge(boost::shared_ptr<SessionContext>, bool fail);
+    void startTxSession(boost::shared_ptr<SessionContext>);
+
+    void syncLH(boost::shared_ptr<SessionContext> ssn, 
sys::Monitor::ScopedLock&);
+    void sendLH(boost::shared_ptr<SessionContext>, 
boost::shared_ptr<SenderContext> ctxt,
+                const qpid::messaging::Message& message, bool sync,
+                SenderContext::Delivery** delivery, sys::Monitor::ScopedLock&);
+    void acknowledgeLH(boost::shared_ptr<SessionContext> ssn, 
qpid::messaging::Message* message, bool cumulative, sys::Monitor::ScopedLock&);
 };
 
 }}} // namespace qpid::messaging::amqp



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to