[
https://issues.apache.org/jira/browse/PROTON-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17947062#comment-17947062
]
ASF GitHub Bot commented on PROTON-1442:
----------------------------------------
astitcher commented on code in PR #437:
URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2058501754
##########
cpp/include/proton/session.hpp:
##########
@@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public
internal::object<pn_session_t>, public endp
/// Get user data from this session.
PN_CPP_EXTERN void* user_data() const;
+ PN_CPP_EXTERN void declare_transaction(proton::transaction_handler
&handler, bool settle_before_discharge = false);
+
+ PN_CPP_EXTERN bool txn_is_empty();
+ PN_CPP_EXTERN bool txn_is_declared();
+ PN_CPP_EXTERN void txn_commit();
+ PN_CPP_EXTERN void txn_abort();
+ PN_CPP_EXTERN void txn_declare();
+ PN_CPP_EXTERN void txn_handle_outcome(proton::tracker);
+ PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message
msg);
+ PN_CPP_EXTERN void txn_accept(delivery &t);
+ PN_CPP_EXTERN proton::connection txn_connection() const;
+
/// @cond INTERNAL
friend class internal::factory<session>;
friend class session_iterator;
+ friend class transaction_impl;
/// @endcond
+
+ private:
+ // clean up txn internally
+ void txn_delete();
Review Comment:
I'd be very surprised if this is needed here - I'd think it could only be
called from the impl class
##########
cpp/include/proton/session.hpp:
##########
@@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public
internal::object<pn_session_t>, public endp
/// Get user data from this session.
PN_CPP_EXTERN void* user_data() const;
+ PN_CPP_EXTERN void declare_transaction(proton::transaction_handler
&handler, bool settle_before_discharge = false);
+
+ PN_CPP_EXTERN bool txn_is_empty();
+ PN_CPP_EXTERN bool txn_is_declared();
+ PN_CPP_EXTERN void txn_commit();
+ PN_CPP_EXTERN void txn_abort();
+ PN_CPP_EXTERN void txn_declare();
+ PN_CPP_EXTERN void txn_handle_outcome(proton::tracker);
+ PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message
msg);
+ PN_CPP_EXTERN void txn_accept(delivery &t);
+ PN_CPP_EXTERN proton::connection txn_connection() const;
Review Comment:
Aren't these just internal functions? They should only be needed inside
session_impl
##########
cpp/include/proton/session.hpp:
##########
@@ -36,6 +36,7 @@
struct pn_session_t;
namespace proton {
+ class transaction_impl;
Review Comment:
Is this actually needed here?
##########
cpp/src/messaging_adapter.cpp:
##########
@@ -274,11 +299,13 @@ void on_link_local_open(messaging_handler& handler,
pn_event_t* event) {
void on_link_remote_open(messaging_handler& handler, pn_event_t* event) {
auto lnk = pn_event_link(event);
- // Currently don't implement (transaction) coordinator
- if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
- auto error = pn_link_condition(lnk);
- pn_condition_set_name(error, "amqp:not-implemented");
- pn_link_close(lnk);
+ if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) {
+ auto cond = pn_link_condition(lnk);
+ if (pn_condition_is_set(cond)) {
+ pn_condition_set_name(cond, "amqp:on_link_remote_open:FAILED");
+ pn_link_close(lnk);
+ return;
+ }
Review Comment:
I think this change might be wrong - the C++ binding still doesn't support
an incoming coordinator target.
[Even though we do now use an outgoing transaction coordinator target]
##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,231 @@ void* session::user_data() const {
return sctx.user_data_;
}
+
+
+class transaction_impl {
+ public:
+ proton::sender txn_ctrl;
+ proton::transaction_handler *handler = nullptr;
+ proton::binary transaction_id;
+ bool failed = false;
+ enum State {
+ FREE,
+ DECLARING,
+ DECLARED,
+ DISCHARGING,
+ };
+ enum State state = State::FREE;
+ std::vector<proton::tracker> pending;
+
+ void commit();
+ void abort();
+ void declare();
+ proton::tracker send(proton::sender s, proton::message msg);
+
+ void discharge(bool failed);
+ void release_pending();
+ void accept(delivery &d);
+ void update(tracker &d, uint64_t state);
+
+ proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+ void handle_outcome(proton::tracker t);
+ transaction_impl(proton::sender &_txn_ctrl,
+ proton::transaction_handler &_handler,
+ bool _settle_before_discharge);
+ ~transaction_impl();
+};
+
+void session::declare_transaction(proton::transaction_handler &handler, bool
settle_before_discharge) {
+ auto &txn_impl = session_context::get(pn_object())._txn_impl;
+ if (txn_impl == nullptr) {
+ // Create _txn_impl
+ proton::connection conn = this->connection();
+ class InternalTransactionHandler : public proton::messaging_handler {
+
+ void on_tracker_settle(proton::tracker &t) override {
+ if (!t.session().txn_is_empty()) {
+ t.session().txn_handle_outcome(t);
+ }
+ }
+ };
+
+ proton::target_options opts;
+ std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+ opts.capabilities(cap);
+ opts.mark_coordinator();
+
+ proton::sender_options so;
+ so.name("txn-ctrl");
+ so.target(opts);
+
+ static InternalTransactionHandler internal_handler; //
internal_handler going out of scope. Fix it
+ so.handler(internal_handler);
+
+ static proton::sender s = conn.open_sender("does not matter", so);
+
+ settle_before_discharge = false;
+
+ txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+ }
+ // Declare txn
+ txn_impl->declare();
+}
+
+void session::txn_delete() { auto &_txn_impl =
session_context::get(pn_object())._txn_impl; delete _txn_impl; _txn_impl =
nullptr;}
+void session::txn_commit() {
session_context::get(pn_object())._txn_impl->commit(); }
+void session::txn_abort() {
session_context::get(pn_object())._txn_impl->abort(); }
+void session::txn_declare() {
session_context::get(pn_object())._txn_impl->declare(); }
+bool session::txn_is_empty() { return
session_context::get(pn_object())._txn_impl == NULL; }
+bool session::txn_is_declared() { return (!txn_is_empty()) &&
session_context::get(pn_object())._txn_impl->state ==
transaction_impl::State::DECLARED; }
+void session::txn_accept(delivery &t) { return
session_context::get(pn_object())._txn_impl->accept(t); }
+proton::tracker session::txn_send(proton::sender s, proton::message msg) {
+ return session_context::get(pn_object())._txn_impl->send(s, msg);
+}
+void session::txn_handle_outcome(proton::tracker t) {
+ session_context::get(pn_object())._txn_impl->handle_outcome(t);
+}
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+ proton::transaction_handler &_handler,
+ bool _settle_before_discharge)
+ : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+ discharge(false);
+}
+
+void transaction_impl::abort() {
+ discharge(true);
+}
+
+void transaction_impl::declare() {
+ if (state != transaction_impl::State::FREE)
+ throw proton::error("This session has some associcated transaction
already");
+ state = State::DECLARING;
+
+ proton::symbol descriptor("amqp:declare:list");
+ std::list<proton::value> vd;
+ proton::value i_am_null;
+ vd.push_back(i_am_null);
+ proton::value _value = vd;
+ send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+ if (state != transaction_impl::State::DECLARED)
+ throw proton::error("Only a declared txn can be discharged.");
+ state = State::DISCHARGING;
+
+ failed = _failed;
+ proton::symbol descriptor("amqp:discharge:list");
+ std::list<proton::value> vd;
+ vd.push_back(transaction_id);
+ vd.push_back(failed);
+ proton::value _value = vd;
+ send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor,
proton::value _value) {
+ proton::value msg_value;
+ proton::codec::encoder enc(msg_value);
+ enc << proton::codec::start::described()
+ << descriptor
+ << _value
+ << proton::codec::finish();
+
+
+ proton::message msg = msg_value;
+ proton::tracker delivery = txn_ctrl.send(msg);
+ return delivery;
+}
+
+proton::tracker transaction_impl::send(proton::sender s, proton::message msg) {
+ if (state != transaction_impl::State::DECLARED)
+ throw proton::error("Only a declared transaction can send a message");
+ proton::tracker tracker = s.send(msg);
+ update(tracker, 0x34);
+ return tracker;
+}
+
+void transaction_impl::accept(delivery &t) {
+ t.settle();
Review Comment:
Needs update(t, PN_ACCEPT);
##########
cpp/src/session.cpp:
##########
@@ -148,4 +158,231 @@ void* session::user_data() const {
return sctx.user_data_;
}
+
+
+class transaction_impl {
+ public:
+ proton::sender txn_ctrl;
+ proton::transaction_handler *handler = nullptr;
+ proton::binary transaction_id;
+ bool failed = false;
+ enum State {
+ FREE,
+ DECLARING,
+ DECLARED,
+ DISCHARGING,
+ };
+ enum State state = State::FREE;
+ std::vector<proton::tracker> pending;
+
+ void commit();
+ void abort();
+ void declare();
+ proton::tracker send(proton::sender s, proton::message msg);
+
+ void discharge(bool failed);
+ void release_pending();
+ void accept(delivery &d);
+ void update(tracker &d, uint64_t state);
+
+ proton::tracker send_ctrl(proton::symbol descriptor, proton::value _value);
+ void handle_outcome(proton::tracker t);
+ transaction_impl(proton::sender &_txn_ctrl,
+ proton::transaction_handler &_handler,
+ bool _settle_before_discharge);
+ ~transaction_impl();
+};
+
+void session::declare_transaction(proton::transaction_handler &handler, bool
settle_before_discharge) {
+ auto &txn_impl = session_context::get(pn_object())._txn_impl;
+ if (txn_impl == nullptr) {
+ // Create _txn_impl
+ proton::connection conn = this->connection();
+ class InternalTransactionHandler : public proton::messaging_handler {
+
+ void on_tracker_settle(proton::tracker &t) override {
+ if (!t.session().txn_is_empty()) {
+ t.session().txn_handle_outcome(t);
+ }
+ }
+ };
+
+ proton::target_options opts;
+ std::vector<symbol> cap = {proton::symbol("amqp:local-transactions")};
+ opts.capabilities(cap);
+ opts.mark_coordinator();
+
+ proton::sender_options so;
+ so.name("txn-ctrl");
+ so.target(opts);
+
+ static InternalTransactionHandler internal_handler; //
internal_handler going out of scope. Fix it
+ so.handler(internal_handler);
+
+ static proton::sender s = conn.open_sender("does not matter", so);
+
+ settle_before_discharge = false;
+
+ txn_impl = new transaction_impl(s, handler, settle_before_discharge);
+ }
+ // Declare txn
+ txn_impl->declare();
+}
+
+void session::txn_delete() { auto &_txn_impl =
session_context::get(pn_object())._txn_impl; delete _txn_impl; _txn_impl =
nullptr;}
+void session::txn_commit() {
session_context::get(pn_object())._txn_impl->commit(); }
+void session::txn_abort() {
session_context::get(pn_object())._txn_impl->abort(); }
+void session::txn_declare() {
session_context::get(pn_object())._txn_impl->declare(); }
+bool session::txn_is_empty() { return
session_context::get(pn_object())._txn_impl == NULL; }
+bool session::txn_is_declared() { return (!txn_is_empty()) &&
session_context::get(pn_object())._txn_impl->state ==
transaction_impl::State::DECLARED; }
+void session::txn_accept(delivery &t) { return
session_context::get(pn_object())._txn_impl->accept(t); }
+proton::tracker session::txn_send(proton::sender s, proton::message msg) {
+ return session_context::get(pn_object())._txn_impl->send(s, msg);
+}
+void session::txn_handle_outcome(proton::tracker t) {
+ session_context::get(pn_object())._txn_impl->handle_outcome(t);
+}
+
+transaction_impl::transaction_impl(proton::sender &_txn_ctrl,
+ proton::transaction_handler &_handler,
+ bool _settle_before_discharge)
+ : txn_ctrl(_txn_ctrl), handler(&_handler) {
+}
+transaction_impl::~transaction_impl() {}
+
+void transaction_impl::commit() {
+ discharge(false);
+}
+
+void transaction_impl::abort() {
+ discharge(true);
+}
+
+void transaction_impl::declare() {
+ if (state != transaction_impl::State::FREE)
+ throw proton::error("This session has some associcated transaction
already");
+ state = State::DECLARING;
+
+ proton::symbol descriptor("amqp:declare:list");
+ std::list<proton::value> vd;
+ proton::value i_am_null;
+ vd.push_back(i_am_null);
+ proton::value _value = vd;
+ send_ctrl(descriptor, _value);
+}
+
+void transaction_impl::discharge(bool _failed) {
+ if (state != transaction_impl::State::DECLARED)
+ throw proton::error("Only a declared txn can be discharged.");
+ state = State::DISCHARGING;
+
+ failed = _failed;
+ proton::symbol descriptor("amqp:discharge:list");
+ std::list<proton::value> vd;
+ vd.push_back(transaction_id);
+ vd.push_back(failed);
+ proton::value _value = vd;
+ send_ctrl(descriptor, _value);
+}
+
+proton::tracker transaction_impl::send_ctrl(proton::symbol descriptor,
proton::value _value) {
+ proton::value msg_value;
+ proton::codec::encoder enc(msg_value);
+ enc << proton::codec::start::described()
+ << descriptor
+ << _value
+ << proton::codec::finish();
+
+
+ proton::message msg = msg_value;
+ proton::tracker delivery = txn_ctrl.send(msg);
+ return delivery;
+}
+
+proton::tracker transaction_impl::send(proton::sender s, proton::message msg) {
+ if (state != transaction_impl::State::DECLARED)
+ throw proton::error("Only a declared transaction can send a message");
+ proton::tracker tracker = s.send(msg);
+ update(tracker, 0x34);
+ return tracker;
+}
+
+void transaction_impl::accept(delivery &t) {
+ t.settle();
+}
+
+void transaction_impl::update(tracker &t, uint64_t state) {
Review Comment:
I think this should be delivery not tracker
##########
cpp/include/proton/session.hpp:
##########
@@ -105,14 +106,30 @@ PN_CPP_CLASS_EXTERN session : public
internal::object<pn_session_t>, public endp
/// Get user data from this session.
PN_CPP_EXTERN void* user_data() const;
+ PN_CPP_EXTERN void declare_transaction(proton::transaction_handler
&handler, bool settle_before_discharge = false);
+
+ PN_CPP_EXTERN bool txn_is_empty();
+ PN_CPP_EXTERN bool txn_is_declared();
+ PN_CPP_EXTERN void txn_commit();
+ PN_CPP_EXTERN void txn_abort();
+ PN_CPP_EXTERN void txn_declare();
+ PN_CPP_EXTERN void txn_handle_outcome(proton::tracker);
+ PN_CPP_EXTERN proton::tracker txn_send(proton::sender s, proton::message
msg);
+ PN_CPP_EXTERN void txn_accept(delivery &t);
+ PN_CPP_EXTERN proton::connection txn_connection() const;
+
/// @cond INTERNAL
friend class internal::factory<session>;
friend class session_iterator;
+ friend class transaction_impl;
Review Comment:
I'd think it's more likely that transaction_impl needs to a friend of
session_impl
##########
cpp/include/proton/target_options.hpp:
##########
@@ -60,6 +60,10 @@ class target_options {
/// address is ignored if dynamic() is true.
PN_CPP_EXTERN target_options& address(const std::string& addr);
+ /// Set the target be of type coordinator.
+ /// This immediately override the currently assigned type.
+ PN_CPP_EXTERN target_options& mark_coordinator();
Review Comment:
Need a better name: set_coordinator? make_coordinator? It's possible this
doesn't need to be a user visible option.
##########
cpp/src/proactor_container_impl.cpp:
##########
@@ -26,6 +26,7 @@
#include "proton/listener.hpp"
#include "proton/reconnect_options.hpp"
#include "proton/ssl.hpp"
+#include "proton/target_options.hpp"
Review Comment:
Change Not needed
##########
cpp/src/messaging_adapter.cpp:
##########
@@ -116,7 +119,29 @@ void on_delivery(messaging_handler& handler, pn_event_t*
event) {
link_context& lctx = link_context::get(lnk);
Tracing& ot = Tracing::getTracing();
- if (pn_link_is_receiver(lnk)) {
+ if (pn_terminus_get_type(pn_link_remote_target(lnk)) == PN_COORDINATOR) {
+ if (pn_delivery_updated(dlv)) {
+ tracker t(make_wrapper<tracker>(dlv));
+ ot.on_settled_span(t);
+ switch (pn_delivery_remote_state(dlv)) {
+ case PN_ACCEPTED:
+ handler.on_tracker_accept(t);
+ break;
+ case PN_REJECTED:
+ handler.on_tracker_reject(t);
+ break;
+ case PN_RELEASED:
+ case PN_MODIFIED:
+ handler.on_tracker_release(t);
+ break;
+ }
+ if (t.settled()) {
+ handler.on_tracker_settle(t);
+ if (lctx.auto_settle)
+ t.settle();
+ }
+ }
+ } else if (pn_link_is_receiver(lnk)) {
Review Comment:
I don't understand why the handling of messages on a transaction coordinator
link should be different from any other link.
##########
cpp/src/proactor_container_impl.cpp:
##########
@@ -34,11 +35,15 @@
#include "proton/proactor.h"
#include "proton/transport.h"
+#include "proton/delivery.h"
+
#include "contexts.hpp"
#include "messaging_adapter.hpp"
#include "reconnect_options_impl.hpp"
#include "proton_bits.hpp"
+#include <proton/types.hpp>
+
Review Comment:
Change not needed
##########
cpp/src/proactor_container_impl.cpp:
##########
@@ -34,11 +35,15 @@
#include "proton/proactor.h"
#include "proton/transport.h"
+#include "proton/delivery.h"
+
Review Comment:
Change not needed
##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,164 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction_handler.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <atomic>
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::sender sender;
+ std::string url;
+ int total;
+ int batch_size;
+ int sent;
+ int batch_index = 0;
+ int current_batch = 0;
+ int committed = 0;
+
+ public:
+ tx_send(const std::string &s, int c, int b):
+ url(s), total(c), batch_size(b), sent(0) {}
+
+ void on_container_start(proton::container &c) override {
+ sender = c.open_sender(url);
+ }
+
+ void on_session_open(proton::session &s) override {
+ std::cout << "New session is open, declaring transaction now..." <<
std::endl;
+ s.declare_transaction(*this);
+ }
+
+ void on_transaction_declare_failed(proton::session s) {
+ std::cout << "Transaction declarion failed" << std::endl;
+ s.connection().close();
+ exit(-1);
+ }
+
+ void on_transaction_commit_failed(proton::session s) {
+ std::cout << "Transaction commit failed!" << std::endl;
+ s.connection().close();
+ exit(-1);
+ }
+
+ void on_transaction_declared(proton::session s) override {
+ std::cout << "Transaction is declared" << std::endl;
+ send();
+ }
+
+ void on_sendable(proton::sender&) override {
+ send();
+ }
+
+ void send() {
+ std::atomic<int> unique_id(10000);
+ proton::session session = sender.session();
+ while (session.txn_is_declared() && sender.credit() &&
+ (committed + current_batch) < total) {
+ proton::message msg;
+ std::map<std::string, int> m;
+ m["sequence"] = committed + current_batch;
+
+ msg.id(std::atomic_fetch_add(&unique_id, 1));
+ msg.body(m);
+ std::cout << "Sending: " << msg << std::endl;
+ session.txn_send(sender, msg);
Review Comment:
I think the API we were aiming for would just use a sender object created
from the transactioned session.
> [c++] Support for transactions
> ------------------------------
>
> Key: PROTON-1442
> URL: https://issues.apache.org/jira/browse/PROTON-1442
> Project: Qpid Proton
> Issue Type: Improvement
> Components: cpp-binding
> Reporter: Radim Kubis
> Assignee: Rakhi Kumari
> Priority: Major
>
> Support for transactions in Qpid Proton C++.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]