astitcher commented on code in PR #437:
URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2515167904


##########
cpp/src/messaging_adapter.cpp:
##########
@@ -274,13 +277,6 @@ void on_link_local_open(messaging_handler& handler, 
pn_event_t* event) {
 
 void on_link_remote_open(messaging_handler& handler, pn_event_t* event) {
     auto lnk = pn_event_link(event);
-    // Currently don't implement (transaction) coordinator
-    if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
-      auto error = pn_link_condition(lnk);
-      pn_condition_set_name(error, "amqp:not-implemented");
-      pn_link_close(lnk);
-      return;
-    }

Review Comment:
   I think this deletion isn't exactly correct: Whilst we do now need to cope 
with a remote coordinator this is only on the link the implementation itself 
has opened to control transactions not any other general link so we should 
still not allow incoming coordinator links in general.



##########
cpp/src/messaging_adapter.cpp:
##########
@@ -40,6 +41,7 @@
 
 #include <proton/connection.h>
 #include <proton/delivery.h>
+#include <proton/disposition.h>

Review Comment:
   Why is this needed?



##########
cpp/src/sender.cpp:
##########
@@ -26,10 +26,12 @@
 #include "proton/source.hpp"
 #include "proton/target.hpp"
 #include "proton/tracker.hpp"
+#include "types_internal.hpp"

Review Comment:
   This include is in the wrong block of includes - it is purely internal so 
should in the bottom block



##########
cpp/src/session.cpp:
##########
@@ -21,13 +21,22 @@
 #include "proton/session.hpp"
 
 #include "proton/connection.hpp"
+#include "proton/delivery.h"
+#include "proton/delivery.hpp"
+#include "proton/error.hpp"
 #include "proton/receiver_options.hpp"
 #include "proton/sender_options.hpp"
 #include "proton/session_options.hpp"
+#include "proton/target_options.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/tracker.hpp"
+#include "proton/transfer.hpp"
+#include "types_internal.hpp"

Review Comment:
   In wrong header block



##########
cpp/src/session.cpp:
##########
@@ -70,6 +79,7 @@ std::string next_link_name(const connection& c) {
 
     return ln ? ln->link_name() : uuid::random().str();
 }
+

Review Comment:
   not needed blank line



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it

Review Comment:
   This *Cannot* be a static, it must be part of the txn_impl somehow.



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};

Review Comment:
   I'd prefer if the construct/destructors were after the data member but 
before everything else



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+
+void transaction_impl::handle_outcome(proton::tracker t) {
+    pn_disposition_t *disposition = pn_delivery_remote(unwrap(t));
+    pn_declared_disposition_t *declared_disp = 
pn_declared_disposition(disposition);
+    proton::session session = t.session();
+    if (state == State::DECLARING) {
+        // Attempting to declare transaction
+        if (pn_disposition_is_failed(disposition)) {
+            state = State::FREE;
+            transaction_delete(session);
+            // on_transaction_declare_failed
+            handler->on_session_error(session);
+            return;
+        } else {
+            pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp);
+            transaction_id = proton::bin(txn_id);
+            state = State::DECLARED;
+            handler->on_session_open(session);
+            return;
+        }
+    } else if (state == State::DISCHARGING) {
+        // Attempting to commit/abort transaction
+        if (pn_disposition_is_failed(disposition)) {
+            if (!failed) {
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_commit_failed(session);
+                release_pending();

Review Comment:
   Delete - pending is never used



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+

Review Comment:
   Delete - pending is never used



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+
+void transaction_impl::handle_outcome(proton::tracker t) {
+    pn_disposition_t *disposition = pn_delivery_remote(unwrap(t));
+    pn_declared_disposition_t *declared_disp = 
pn_declared_disposition(disposition);
+    proton::session session = t.session();
+    if (state == State::DECLARING) {
+        // Attempting to declare transaction
+        if (pn_disposition_is_failed(disposition)) {
+            state = State::FREE;
+            transaction_delete(session);
+            // on_transaction_declare_failed
+            handler->on_session_error(session);

Review Comment:
   Use after free: handler is used after the transaction_impl is deleted! Also 
no need to set the state as the impl is just about to be deleted anyway.



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+
+void transaction_impl::handle_outcome(proton::tracker t) {
+    pn_disposition_t *disposition = pn_delivery_remote(unwrap(t));
+    pn_declared_disposition_t *declared_disp = 
pn_declared_disposition(disposition);
+    proton::session session = t.session();
+    if (state == State::DECLARING) {
+        // Attempting to declare transaction
+        if (pn_disposition_is_failed(disposition)) {
+            state = State::FREE;
+            transaction_delete(session);
+            // on_transaction_declare_failed
+            handler->on_session_error(session);
+            return;
+        } else {
+            pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp);
+            transaction_id = proton::bin(txn_id);
+            state = State::DECLARED;
+            handler->on_session_open(session);
+            return;
+        }
+    } else if (state == State::DISCHARGING) {
+        // Attempting to commit/abort transaction
+        if (pn_disposition_is_failed(disposition)) {
+            if (!failed) {
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_commit_failed(session);
+                release_pending();
+                return;
+            } else {
+                state = State::FREE;
+                transaction_delete(session);
+                // Transaction abort failed.
+                return;
+            }
+        } else {
+            if (failed) {
+                // Transaction abort is successful
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_aborted(session);
+                release_pending();
+                return;
+            } else {
+                // Transaction commit is successful
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_committed(session);

Review Comment:
   Use after free



##########
cpp/src/session.cpp:
##########
@@ -21,13 +21,22 @@
 #include "proton/session.hpp"
 
 #include "proton/connection.hpp"
+#include "proton/delivery.h"
+#include "proton/delivery.hpp"
+#include "proton/error.hpp"

Review Comment:
   These are all C headers so should be their own block below.



##########
cpp/include/proton/session.hpp:
##########
@@ -105,14 +105,22 @@ PN_CPP_CLASS_EXTERN session : public 
internal::object<pn_session_t>, public endp
     /// Get user data from this session.
     PN_CPP_EXTERN void* user_data() const;
 
+    PN_CPP_EXTERN void transaction_declare(proton::messaging_handler &handler, 
bool settle_before_discharge = false);
+    PN_CPP_EXTERN bool transaction_is_declared();
+    PN_CPP_EXTERN proton::binary transaction_id() const;
+    PN_CPP_EXTERN void transaction_commit();
+    PN_CPP_EXTERN void transaction_abort();

Review Comment:
   Need documentation for these APIs



##########
cpp/include/proton/target_options.hpp:
##########
@@ -60,6 +60,10 @@ class target_options {
     /// address is ignored if dynamic() is true.
     PN_CPP_EXTERN target_options& address(const std::string& addr);
 
+    /// Set the target be of type coordinator.
+    /// This immediately override the currently assigned type.
+    PN_CPP_EXTERN target_options& make_coordinator();
+

Review Comment:
   I don't think this is needed to use the transaction API. If it's used in the 
implementation it needs to be purely internal. But I don't think it's actually 
used at all.



##########
cpp/examples/simple_recv.cpp:
##########


Review Comment:
   None of the changes in this file relate to transactions, I recommend 
removing all changes in this file:
   The change is useful, but something unnecessary for a simple example, moving 
the code more in the direction of a generally useful utility. Having such a 
general receive utility would be good, but maybe should be in a different place.
   



##########
cpp/src/sender.cpp:
##########
@@ -26,10 +26,12 @@
 #include "proton/source.hpp"
 #include "proton/target.hpp"
 #include "proton/tracker.hpp"
+#include "types_internal.hpp"
 
 #include <proton/delivery.h>
 #include <proton/link.h>
 #include <proton/types.h>
+#include <proton/session.hpp>

Review Comment:
   This include is inthe wrong block - it is c++ so should be in te block above 
(inserted in alphabetical order)



##########
cpp/src/contexts.hpp:
##########
@@ -25,6 +25,7 @@
 #include "reconnect_options_impl.hpp"
 
 #include "proton/work_queue.hpp"
+#include "proton/session.hpp"

Review Comment:
   I can't see a reason this include should be needed by the new lines.



##########
cpp/src/node_options.cpp:
##########


Review Comment:
   As above I don't think these changes are needed



##########
cpp/include/proton/session.hpp:
##########
@@ -105,14 +105,22 @@ PN_CPP_CLASS_EXTERN session : public 
internal::object<pn_session_t>, public endp
     /// Get user data from this session.
     PN_CPP_EXTERN void* user_data() const;
 
+    PN_CPP_EXTERN void transaction_declare(proton::messaging_handler &handler, 
bool settle_before_discharge = false);

Review Comment:
   I don't think it makes sense to have a separate handler for callbacks 
related to the transaction: The session can (by the current API design) only 
have a single transaction active so the session handler can just receive any 
transaction callbacks - there can be no ambiguity which transaction is being 
referred to.



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+
+void transaction_impl::handle_outcome(proton::tracker t) {
+    pn_disposition_t *disposition = pn_delivery_remote(unwrap(t));
+    pn_declared_disposition_t *declared_disp = 
pn_declared_disposition(disposition);
+    proton::session session = t.session();
+    if (state == State::DECLARING) {
+        // Attempting to declare transaction
+        if (pn_disposition_is_failed(disposition)) {
+            state = State::FREE;
+            transaction_delete(session);
+            // on_transaction_declare_failed
+            handler->on_session_error(session);
+            return;
+        } else {
+            pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp);
+            transaction_id = proton::bin(txn_id);
+            state = State::DECLARED;
+            handler->on_session_open(session);
+            return;
+        }
+    } else if (state == State::DISCHARGING) {
+        // Attempting to commit/abort transaction
+        if (pn_disposition_is_failed(disposition)) {
+            if (!failed) {
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_commit_failed(session);

Review Comment:
   Use after free again



##########
cpp/src/session.cpp:
##########
@@ -21,13 +21,22 @@
 #include "proton/session.hpp"
 
 #include "proton/connection.hpp"
+#include "proton/delivery.h"
+#include "proton/delivery.hpp"
+#include "proton/error.hpp"
 #include "proton/receiver_options.hpp"
 #include "proton/sender_options.hpp"
 #include "proton/session_options.hpp"
+#include "proton/target_options.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/tracker.hpp"
+#include "proton/transfer.hpp"
+#include "types_internal.hpp"
 
 #include "contexts.hpp"
 #include "link_namer.hpp"
 #include "proton_bits.hpp"
+#include <proton/types.hpp>

Review Comment:
   In wrong header block



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+
+void transaction_impl::handle_outcome(proton::tracker t) {
+    pn_disposition_t *disposition = pn_delivery_remote(unwrap(t));
+    pn_declared_disposition_t *declared_disp = 
pn_declared_disposition(disposition);

Review Comment:
   The tests in here are strange and maybe wrong: `pn_declared_disposition` 
will return `nullptr` if the disposition is not a declared disposition, for 
`pn_disposition_is_failed` to ever return true the disposition must be a 
modified disposition. So the tests maybe should be reordered?



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}

Review Comment:
   `auto& _txn_impl` - space after `&` not before.
   *But* don't use identifiers with leading `_` at all.



##########
cpp/src/session.cpp:
##########
@@ -21,13 +21,22 @@
 #include "proton/session.hpp"
 
 #include "proton/connection.hpp"
+#include "proton/delivery.h"
+#include "proton/delivery.hpp"
+#include "proton/error.hpp"
 #include "proton/receiver_options.hpp"
 #include "proton/sender_options.hpp"
 #include "proton/session_options.hpp"
+#include "proton/target_options.hpp"
+#include "proton/messaging_handler.hpp"

Review Comment:
   Out of alphabetical order



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }

Review Comment:
   Don't use NULL in C++ - use nullptr



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);

Review Comment:
   Can't be static - part of txn_impl as well?
   Also use a more useful sender name.



##########
cpp/src/contexts.hpp:
##########
@@ -152,6 +154,7 @@ class session_context : public context {
     session_context() : handler(0), user_data_(nullptr) {}
     static session_context& get(pn_session_t* s);
 
+    transaction_impl* _txn_impl;

Review Comment:
   Naming: don't start identifiers with `_`
   Shouldn't this be a std::unique_ptr? The context owns this pointer (its 
really an optional part of the session context) there is no other place this 
pointer is stored or referenced except in the session code.



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {

Review Comment:
   might as well use `enum class` here to avoid any namespace pollution or 
unintentional casting.



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {

Review Comment:
   Change `_failed`



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);

Review Comment:
   Can be replaced with:
   `send_ctrl("amqp:discharge:list", std::list<proton::value>{transaction_id, 
failed});`



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");

Review Comment:
   Spelling error "associated". Grammar error "This session has an 
associated...". But change message to something briefer: "Session already has 
transaction" for example.



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);

Review Comment:
   This can be replaced with:
   send_ctrl("amqp:declare:list", std::list<proton::value>{});



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;

Review Comment:
   Delete - pending is never used



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {

Review Comment:
   Replace `_value`



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;

Review Comment:
   Can replace with:
   `return txn_ctrl.send(msg_value);`



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+
+void transaction_impl::handle_outcome(proton::tracker t) {
+    pn_disposition_t *disposition = pn_delivery_remote(unwrap(t));
+    pn_declared_disposition_t *declared_disp = 
pn_declared_disposition(disposition);
+    proton::session session = t.session();
+    if (state == State::DECLARING) {
+        // Attempting to declare transaction
+        if (pn_disposition_is_failed(disposition)) {
+            state = State::FREE;
+            transaction_delete(session);
+            // on_transaction_declare_failed
+            handler->on_session_error(session);
+            return;
+        } else {
+            pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp);
+            transaction_id = proton::bin(txn_id);
+            state = State::DECLARED;
+            handler->on_session_open(session);
+            return;
+        }
+    } else if (state == State::DISCHARGING) {
+        // Attempting to commit/abort transaction
+        if (pn_disposition_is_failed(disposition)) {
+            if (!failed) {
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_commit_failed(session);
+                release_pending();
+                return;
+            } else {
+                state = State::FREE;
+                transaction_delete(session);
+                // Transaction abort failed.
+                return;
+            }
+        } else {
+            if (failed) {
+                // Transaction abort is successful
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_aborted(session);
+                release_pending();
+                return;
+            } else {
+                // Transaction commit is successful
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_committed(session);
+                return;
+            }
+        }
+        pending.clear();

Review Comment:
   Delete - pending is never used



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+
+void transaction_impl::handle_outcome(proton::tracker t) {
+    pn_disposition_t *disposition = pn_delivery_remote(unwrap(t));
+    pn_declared_disposition_t *declared_disp = 
pn_declared_disposition(disposition);
+    proton::session session = t.session();
+    if (state == State::DECLARING) {
+        // Attempting to declare transaction
+        if (pn_disposition_is_failed(disposition)) {
+            state = State::FREE;
+            transaction_delete(session);
+            // on_transaction_declare_failed
+            handler->on_session_error(session);
+            return;
+        } else {
+            pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp);
+            transaction_id = proton::bin(txn_id);
+            state = State::DECLARED;
+            handler->on_session_open(session);
+            return;
+        }
+    } else if (state == State::DISCHARGING) {
+        // Attempting to commit/abort transaction
+        if (pn_disposition_is_failed(disposition)) {
+            if (!failed) {
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_commit_failed(session);
+                release_pending();
+                return;
+            } else {
+                state = State::FREE;
+                transaction_delete(session);
+                // Transaction abort failed.
+                return;
+            }
+        } else {
+            if (failed) {
+                // Transaction abort is successful
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_aborted(session);

Review Comment:
   Use after free



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;

Review Comment:
   don't need `enum` here - it's not C!



##########
cpp/src/messaging_adapter.cpp:
##########
@@ -69,7 +71,8 @@ void on_link_flow(messaging_handler& handler, pn_event_t* 
event) {
     // TODO: process session flow data, if no link-specific data, just return.
     if (!lnk) return;
     int state = pn_link_state(lnk);
-    if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
+    if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR ||

Review Comment:
   I still think this isn't needed



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();
+
+    proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+    void handle_outcome(proton::tracker t);
+    transaction_impl(proton::sender &_txn_ctrl,
+                     proton::messaging_handler &_handler,
+                     bool _settle_before_discharge);
+    ~transaction_impl();
+};
+
+
+namespace {
+
+bool transaction_is_empty(const session& s) { return 
session_context::get(unwrap(s))._txn_impl == NULL; }
+
+void transaction_handle_outcome(const session& s, proton::tracker t) {
+    session_context::get(unwrap(s))._txn_impl->handle_outcome(t);
+}
+
+void transaction_delete(const session& s) { auto &_txn_impl = 
session_context::get(unwrap(s))._txn_impl; delete _txn_impl; _txn_impl = 
nullptr;}
+
+}
+
+void session::transaction_declare(proton::messaging_handler &handler, bool 
settle_before_discharge) {
+    auto &txn_impl = session_context::get(pn_object())._txn_impl;
+    if (txn_impl == nullptr) {
+        // Create _txn_impl
+        proton::connection conn = this->connection();
+        class InternalTransactionHandler : public proton::messaging_handler {
+
+            void on_tracker_settle(proton::tracker &t) override {
+                if (!transaction_is_empty(t.session())) {
+                    transaction_handle_outcome(t.session(), t);
+                }
+            }
+        };
+
+        proton::target_options opts;
+        std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+        opts.capabilities(cap);
+        opts.make_coordinator();
+
+        proton::sender_options so;
+        so.name("txn-ctrl");
+        so.target(opts);
+
+        static InternalTransactionHandler internal_handler; // 
internal_handler going out of scope. Fix it
+        so.handler(internal_handler);
+
+        static proton::sender s = conn.open_sender("does not matter", so);
+
+        settle_before_discharge = false;
+
+        txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+    }
+    // Declare txn
+    txn_impl->declare();
+}
+
+
+proton::binary session::transaction_id() const { return 
session_context::get(pn_object())._txn_impl->transaction_id; }
+void session::transaction_commit() { 
session_context::get(pn_object())._txn_impl->commit(); }
+void session::transaction_abort() { 
session_context::get(pn_object())._txn_impl->abort(); }
+bool session::transaction_is_declared() { return 
(!transaction_is_empty(*this)) && 
session_context::get(pn_object())._txn_impl->state == 
transaction_impl::State::DECLARED; }
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+                                   proton::messaging_handler &_handler,
+                                   bool _settle_before_discharge)
+    : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+    discharge(false);
+}
+
+void transaction_impl::abort() {
+    discharge(true);
+}
+
+void transaction_impl::declare() {
+    if (state != transaction_impl::State::FREE)
+        throw proton::error("This session has some associcated transaction 
already");
+    state = State::DECLARING;
+
+    proton::symbol descriptor("amqp:declare:list");
+    std::list<proton::value> vd;
+    proton::value i_am_null;
+    vd.push_back(i_am_null);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+    if (state != transaction_impl::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    state = State::DISCHARGING;
+
+    failed = _failed;
+    proton::symbol descriptor("amqp:discharge:list");
+    std::list<proton::value> vd;
+    vd.push_back(transaction_id);
+    vd.push_back(failed);
+    proton::value _value = vd;
+    send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor, 
proton::value _value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << _value
+        << proton::codec::finish();
+
+
+    proton::message msg = msg_value;
+    proton::tracker delivery = txn_ctrl.send(msg);
+    return delivery;
+}
+
+void transaction_impl::release_pending() {
+    for (auto d : pending) {
+        delivery d2(make_wrapper<delivery>(unwrap(d)));
+        d2.release();
+    }
+    pending.clear();
+}
+
+void transaction_impl::handle_outcome(proton::tracker t) {
+    pn_disposition_t *disposition = pn_delivery_remote(unwrap(t));
+    pn_declared_disposition_t *declared_disp = 
pn_declared_disposition(disposition);
+    proton::session session = t.session();
+    if (state == State::DECLARING) {
+        // Attempting to declare transaction
+        if (pn_disposition_is_failed(disposition)) {
+            state = State::FREE;
+            transaction_delete(session);
+            // on_transaction_declare_failed
+            handler->on_session_error(session);
+            return;
+        } else {
+            pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp);
+            transaction_id = proton::bin(txn_id);
+            state = State::DECLARED;
+            handler->on_session_open(session);
+            return;
+        }
+    } else if (state == State::DISCHARGING) {
+        // Attempting to commit/abort transaction
+        if (pn_disposition_is_failed(disposition)) {
+            if (!failed) {
+                state = State::FREE;
+                transaction_delete(session);
+                handler->on_session_transaction_commit_failed(session);
+                release_pending();
+                return;
+            } else {
+                state = State::FREE;
+                transaction_delete(session);

Review Comment:
   No need for state change as deleted



##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,209 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+class transaction_impl {
+  public:
+    proton::sender txn_ctrl;
+    proton::messaging_handler *handler = nullptr;
+    proton::binary transaction_id;
+    bool failed = false;
+    enum State {
+      FREE,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    enum State state = State::FREE;
+    std::vector<proton::tracker> pending;
+
+    void commit();
+    void abort();
+    void declare();
+
+    void discharge(bool failed);
+    void release_pending();

Review Comment:
   Delete this as deleting pending



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to