[
https://issues.apache.org/jira/browse/PROTON-1442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937227#comment-17937227
]
ASF GitHub Bot commented on PROTON-1442:
----------------------------------------
astitcher commented on code in PR #437:
URL: https://github.com/apache/qpid-proton/pull/437#discussion_r2006440001
##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::receiver receiver;
+ std::string url;
+ int expected;
+ int batch_size;
+ int current_batch = 0;
+ int committed = 0;
+
+ proton::session session;
+ public:
+ tx_recv(const std::string &s, int c, int b):
+ url(s), expected(c), batch_size(b) {}
+
+ void on_container_start(proton::container &c) override {
+ receiver = c.open_receiver(url);
+ }
+
+ void on_session_open(proton::session &s) override {
+ session = s;
+ std::cout << " [on_session_open] declare_txn started..." <<
std::endl;
+ s.declare_transaction(*this);
+ std::cout << " [on_session_open] declare_txn ended..." << std::endl;
+ }
+
+ void on_transaction_declare_failed(proton::session) {}
+ void on_transaction_commit_failed(proton::session s) {
+ std::cout << "Transaction Commit Failed" << std::endl;
+ s.connection().close();
+ exit(-1);
+ }
+
+ void on_transaction_declared(proton::session s) override {
+ std::cout << "[on_transaction_declared] txn called " << (&s)
+ << std::endl;
+ receiver.add_credit(batch_size);
+ }
+
+ void on_message(proton::delivery &d, proton::message &msg) override {
+ std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() <<
std::endl;
+ session.txn_accept(d);
Review Comment:
session == d.session()
##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::receiver receiver;
+ std::string url;
+ int expected;
+ int batch_size;
+ int current_batch = 0;
+ int committed = 0;
+
+ proton::session session;
+ public:
+ tx_recv(const std::string &s, int c, int b):
+ url(s), expected(c), batch_size(b) {}
+
+ void on_container_start(proton::container &c) override {
+ receiver = c.open_receiver(url);
+ }
+
+ void on_session_open(proton::session &s) override {
+ session = s;
Review Comment:
Probably not necessary
##########
cpp/include/proton/target_options.hpp:
##########
@@ -101,6 +103,39 @@ class target_options {
/// @endcond
};
+class coordinator_options {
+ public:
+ /// Create an empty set of options.
+ PN_CPP_EXTERN coordinator_options();
+
+ /// Copy options.
+ PN_CPP_EXTERN coordinator_options(const coordinator_options&);
+
+ PN_CPP_EXTERN ~coordinator_options();
+
+ /// Copy options.
+ PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&);
+
+ /// Set the address for the coordinator. It is unset by default. The
+ /// address is ignored if dynamic() is true.
+ PN_CPP_EXTERN coordinator_options& address(const std::string& addr);
+
+ /// **Unsettled API** Extension capabilities that are supported/requested
+ PN_CPP_EXTERN coordinator_options& capabilities(const
std::vector<symbol>&);
+
+ private:
+ void apply(coordinator&) const;
+
+ class impl;
+ std::unique_ptr<impl> impl_;
+
+ /// @cond INTERNAL
+ friend class coordinator;
+ friend class sender_options;
+ friend class receiver_options;
+ /// @endcond
+};
Review Comment:
As coordinator this should be in it's own header file - but again maybe it's
not needed to be visible to the API user anyway.
##########
cpp/include/proton/container.hpp:
##########
@@ -326,6 +326,7 @@ class PN_CPP_CLASS_EXTERN container {
friend class receiver_options;
friend class sender_options;
friend class work_queue;
+ friend class transaction;
Review Comment:
Don't like this as `transaction` is not a user visible class, so it
shouldn't be mentioned in the API at all.
##########
cpp/include/proton/session.hpp:
##########
@@ -29,13 +29,15 @@
#include "./sender.hpp"
#include <string>
+#include <iostream>
/// @file
/// @copybrief proton::session
struct pn_session_t;
namespace proton {
+ class transaction_impl;
Review Comment:
Not sure you need this. I think the `friend class` declaration below is
sufficient.
##########
cpp/src/node_options.cpp:
##########
@@ -162,6 +162,7 @@ class target_options::impl {
option<enum target::expiry_policy> expiry_policy;
option<std::vector<symbol> > capabilities;
option<target::dynamic_property_map> dynamic_properties;
+ option<int> type;
Review Comment:
possibly replace this with `option<bool> is_coordinator;`?
##########
cpp/include/proton/fwd.hpp:
##########
@@ -52,9 +52,12 @@ class sender_options;
class session;
class session_options;
class source_options;
+class coordinator_options;
Review Comment:
Not sure this is really user visible - I don't think users currently need
access to coordinator termini.
##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::receiver receiver;
+ std::string url;
+ int expected;
+ int batch_size;
+ int current_batch = 0;
+ int committed = 0;
+
+ proton::session session;
+ public:
+ tx_recv(const std::string &s, int c, int b):
+ url(s), expected(c), batch_size(b) {}
+
+ void on_container_start(proton::container &c) override {
+ receiver = c.open_receiver(url);
+ }
+
+ void on_session_open(proton::session &s) override {
+ session = s;
+ std::cout << " [on_session_open] declare_txn started..." <<
std::endl;
+ s.declare_transaction(*this);
+ std::cout << " [on_session_open] declare_txn ended..." << std::endl;
+ }
+
+ void on_transaction_declare_failed(proton::session) {}
+ void on_transaction_commit_failed(proton::session s) {
+ std::cout << "Transaction Commit Failed" << std::endl;
+ s.connection().close();
+ exit(-1);
+ }
+
+ void on_transaction_declared(proton::session s) override {
+ std::cout << "[on_transaction_declared] txn called " << (&s)
+ << std::endl;
+ receiver.add_credit(batch_size);
+ }
+
+ void on_message(proton::delivery &d, proton::message &msg) override {
+ std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() <<
std::endl;
+ session.txn_accept(d);
+ current_batch += 1;
+ if(current_batch == batch_size) {
+ }
+ }
+
+ void on_transaction_committed(proton::session s) override {
+ committed += current_batch;
+ current_batch = 0;
+ std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
+ if(committed == expected) {
+ std::cout << "All messages committed" << std::endl;
+ s.connection().close();
+ }
+ else {
+ session.declare_transaction(*this);
Review Comment:
session == s
##########
cpp/include/proton/target.hpp:
##########
@@ -65,6 +65,26 @@ class target : public terminus {
/// @endcond
};
+class coordinator : public terminus {
+ public:
+ /// Create an empty coordinator.
+ coordinator() = default;
+
+ /// The address of the coordinator.
+ PN_CPP_EXTERN std::string address() const;
+ private:
+ coordinator(pn_terminus_t* t);
+ coordinator(const sender&);
+ coordinator(const receiver&);
+
+
+ /// @cond INTERNAL
+ friend class proton::internal::factory<coordinator>;
+ friend class sender;
+ friend class receiver;
+ /// @endcond
+};
Review Comment:
Shouldn't this be in a new `coordinator.hpp` header if it is used visible at
all? I think it's likely though that this class currently doesn't need to be
use visible anyway as it has no current use for a use of the API.
##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::sender sender;
+ std::string url;
+ int total;
+ int batch_size;
+ int sent;
+ int batch_index = 0;
+ int current_batch = 0;
+ int committed = 0;
+ int confirmed = 0;
+
+ proton::session session;
Review Comment:
As tx_recv: I don't think you need to store away the session.
##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::sender sender;
+ std::string url;
+ int total;
+ int batch_size;
+ int sent;
+ int batch_index = 0;
+ int current_batch = 0;
+ int committed = 0;
+ int confirmed = 0;
+
+ proton::session session;
+
+ public:
+ tx_send(const std::string &s, int c, int b):
+ url(s), total(c), batch_size(b), sent(0) {}
+
+ void on_container_start(proton::container &c) override {
+ sender = c.open_sender(url);
+ }
+
+ void on_session_open(proton::session &s) override {
+ session = s;
+ std::cout << " [on_session_open] declare_txn started..." <<
std::endl;
+ s.declare_transaction(*this);
+ std::cout << " [on_session_open] declare_txn ended..." << std::endl;
+ }
+
+ void on_transaction_declare_failed(proton::session) {}
+ void on_transaction_commit_failed(proton::session s) {
+ std::cout << "Transaction Commit Failed" << std::endl;
+ s.connection().close();
+ exit(-1);
+ }
+
+ void on_transaction_declared(proton::session s) override {
+ std::cout << "[on_transaction_declared] Session: " << (&s)
+ << std::endl;
+ std::cout << "[on_transaction_declared] txn is_empty " <<
(s.txn_is_empty())
+ << "\t" << std::endl;
+ send(sender);
+ }
+
+ void on_sendable(proton::sender &s) override {
+ std::cout << " [OnSendable] session: " << &session
+ << std::endl;
+ send(s);
+ }
+
+ void send(proton::sender &s) {
+ static int unique_id = 10000;
+ while (session.txn_is_declared() && sender.credit() &&
+ (committed + current_batch) < total) {
+ proton::message msg;
+ std::map<std::string, int> m;
+ m["sequence"] = committed + current_batch;
+
+ msg.id(unique_id++);
+ msg.body(m);
+ std::cout << "[example] transaction send msg: " << msg
+ << std::endl;
+ session.txn_send(sender, msg);
+ current_batch += 1;
+ if(current_batch == batch_size)
+ {
+ std::cout << " >> Txn attempt commit" << std::endl;
+ if (batch_index % 2 == 0) {
+ session.txn_commit();
+ } else {
+ session.txn_abort();
+ }
+ batch_index++;
+ }
+ }
+ }
+
+ void on_tracker_accept(proton::tracker &t) override {
+ confirmed += 1;
+ std::cout << " [example] on_tracker_accept:" << confirmed
+ << std::endl;
+ }
+
+ void on_transaction_committed(proton::session s) override {
+ committed += current_batch;
+ current_batch = 0;
+ std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
+ if(committed == total) {
+ std::cout << "All messages committed" << std::endl;
+ s.connection().close();
+ }
+ else {
+ std::cout << "redlcaring txn " << std::endl;
+ session.declare_transaction(*this);
+ }
+ }
+
+ void on_transaction_aborted(proton::session s) override {
+ std::cout << "Meesages Aborted ....." << std::endl;
Review Comment:
typo
##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::sender sender;
+ std::string url;
+ int total;
+ int batch_size;
+ int sent;
+ int batch_index = 0;
+ int current_batch = 0;
+ int committed = 0;
+ int confirmed = 0;
+
+ proton::session session;
+
+ public:
+ tx_send(const std::string &s, int c, int b):
+ url(s), total(c), batch_size(b), sent(0) {}
+
+ void on_container_start(proton::container &c) override {
+ sender = c.open_sender(url);
+ }
+
+ void on_session_open(proton::session &s) override {
+ session = s;
+ std::cout << " [on_session_open] declare_txn started..." <<
std::endl;
+ s.declare_transaction(*this);
+ std::cout << " [on_session_open] declare_txn ended..." << std::endl;
+ }
+
+ void on_transaction_declare_failed(proton::session) {}
+ void on_transaction_commit_failed(proton::session s) {
+ std::cout << "Transaction Commit Failed" << std::endl;
+ s.connection().close();
+ exit(-1);
+ }
+
+ void on_transaction_declared(proton::session s) override {
+ std::cout << "[on_transaction_declared] Session: " << (&s)
+ << std::endl;
+ std::cout << "[on_transaction_declared] txn is_empty " <<
(s.txn_is_empty())
+ << "\t" << std::endl;
+ send(sender);
+ }
+
+ void on_sendable(proton::sender &s) override {
+ std::cout << " [OnSendable] session: " << &session
+ << std::endl;
+ send(s);
+ }
+
+ void send(proton::sender &s) {
+ static int unique_id = 10000;
+ while (session.txn_is_declared() && sender.credit() &&
+ (committed + current_batch) < total) {
+ proton::message msg;
+ std::map<std::string, int> m;
+ m["sequence"] = committed + current_batch;
+
+ msg.id(unique_id++);
+ msg.body(m);
+ std::cout << "[example] transaction send msg: " << msg
+ << std::endl;
+ session.txn_send(sender, msg);
+ current_batch += 1;
+ if(current_batch == batch_size)
+ {
+ std::cout << " >> Txn attempt commit" << std::endl;
+ if (batch_index % 2 == 0) {
+ session.txn_commit();
+ } else {
+ session.txn_abort();
+ }
+ batch_index++;
+ }
+ }
+ }
+
+ void on_tracker_accept(proton::tracker &t) override {
+ confirmed += 1;
+ std::cout << " [example] on_tracker_accept:" << confirmed
+ << std::endl;
+ }
+
+ void on_transaction_committed(proton::session s) override {
+ committed += current_batch;
+ current_batch = 0;
+ std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
+ if(committed == total) {
+ std::cout << "All messages committed" << std::endl;
+ s.connection().close();
+ }
+ else {
+ std::cout << "redlcaring txn " << std::endl;
Review Comment:
typo
##########
cpp/include/proton/transaction.hpp:
##########
@@ -0,0 +1,63 @@
+#ifndef PROTON_TRANSACTION_HPP
+#define PROTON_TRANSACTION_HPP
+
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include "./fwd.hpp"
+#include "./internal/export.hpp"
+#include "./sender.hpp"
+#include "./tracker.hpp"
+#include "./container.hpp"
+
+/// @file
+/// @copybrief proton::transaction
+
+namespace proton {
+
+class transaction_handler;
Review Comment:
Don't need a declaration immediately followed by its definition
##########
cpp/examples/tx_recv.cpp:
##########
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_recv : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::receiver receiver;
+ std::string url;
+ int expected;
+ int batch_size;
+ int current_batch = 0;
+ int committed = 0;
+
+ proton::session session;
Review Comment:
Not sure we need to store the session anywhere - I think it can always be
derived inside any callback
##########
cpp/include/proton/transaction.hpp:
##########
@@ -0,0 +1,63 @@
+#ifndef PROTON_TRANSACTION_HPP
Review Comment:
I think this file should be called `transaction_handler.hpp` as that is what
it defines
##########
cpp/include/proton/terminus.hpp:
##########
@@ -122,6 +122,7 @@ class terminus {
friend class internal::factory<terminus>;
friend class source;
friend class target;
+ friend class coordinator;
Review Comment:
I don't really like this especially if coordinator ends up being purely
internal to the binding
##########
cpp/examples/tx_send.cpp:
##########
@@ -0,0 +1,172 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/types.hpp>
+#include <proton/transaction.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include <chrono>
+#include <thread>
+
+class tx_send : public proton::messaging_handler, proton::transaction_handler {
+ private:
+ proton::sender sender;
+ std::string url;
+ int total;
+ int batch_size;
+ int sent;
+ int batch_index = 0;
+ int current_batch = 0;
+ int committed = 0;
+ int confirmed = 0;
+
+ proton::session session;
+
+ public:
+ tx_send(const std::string &s, int c, int b):
+ url(s), total(c), batch_size(b), sent(0) {}
+
+ void on_container_start(proton::container &c) override {
+ sender = c.open_sender(url);
+ }
+
+ void on_session_open(proton::session &s) override {
+ session = s;
+ std::cout << " [on_session_open] declare_txn started..." <<
std::endl;
+ s.declare_transaction(*this);
+ std::cout << " [on_session_open] declare_txn ended..." << std::endl;
+ }
+
+ void on_transaction_declare_failed(proton::session) {}
+ void on_transaction_commit_failed(proton::session s) {
+ std::cout << "Transaction Commit Failed" << std::endl;
+ s.connection().close();
+ exit(-1);
+ }
+
+ void on_transaction_declared(proton::session s) override {
+ std::cout << "[on_transaction_declared] Session: " << (&s)
+ << std::endl;
+ std::cout << "[on_transaction_declared] txn is_empty " <<
(s.txn_is_empty())
+ << "\t" << std::endl;
+ send(sender);
+ }
+
+ void on_sendable(proton::sender &s) override {
+ std::cout << " [OnSendable] session: " << &session
+ << std::endl;
+ send(s);
+ }
+
+ void send(proton::sender &s) {
+ static int unique_id = 10000;
Review Comment:
I think using `static` like this is bad practice in example code. It won't
be thread safe if this code is copied into user code which uses multiple
sending connections. Since we've got C++17 you could use `std::atomic_int` and
`std::atomic_fetch_add`
##########
cpp/include/proton/fwd.hpp:
##########
@@ -52,9 +52,12 @@ class sender_options;
class session;
class session_options;
class source_options;
+class coordinator_options;
class ssl;
class target_options;
class tracker;
+class transaction;
Review Comment:
There is no user visible transaaction class, so this can't be necessary.
##########
cpp/include/proton/target.hpp:
##########
@@ -65,6 +65,26 @@ class target : public terminus {
/// @endcond
};
+class coordinator : public terminus {
+ public:
+ /// Create an empty coordinator.
+ coordinator() = default;
+
+ /// The address of the coordinator.
+ PN_CPP_EXTERN std::string address() const;
+ private:
+ coordinator(pn_terminus_t* t);
+ coordinator(const sender&);
+ coordinator(const receiver&);
+
+
+ /// @cond INTERNAL
+ friend class proton::internal::factory<coordinator>;
+ friend class sender;
+ friend class receiver;
+ /// @endcond
+};
Review Comment:
As I said above I'm coming to think this might actually be just an internal
subclass or `target` or just represented as a transaction_coordinator boolean
on a target, set by a similarly named target_option.
##########
cpp/src/proton_bits.hpp:
##########
@@ -98,11 +102,15 @@ template <> struct wrapped<sender> { typedef pn_link_t
type; };
template <> struct wrapped<receiver> { typedef pn_link_t type; };
template <> struct wrapped<transfer> { typedef pn_delivery_t type; };
template <> struct wrapped<tracker> { typedef pn_delivery_t type; };
+template <> struct wrapped<disposition> {
+ typedef pn_disposition_t type;
+};
Review Comment:
Inconsistent formatting
##########
cpp/include/proton/sender_options.hpp:
##########
@@ -94,6 +94,9 @@ class sender_options {
/// Options for the receiver node of the receiver.
PN_CPP_EXTERN sender_options& target(const target_options&);
+ /// Options for the coordinator node of the receiver.
+ PN_CPP_EXTERN sender_options& coordinator(const coordinator_options&);
Review Comment:
I've now come to think that I was wrong earlier when I suggested that
`coordinator` is a peer class to `target` and `source`. Reading the spec more
carefully, I think that a transaction coordinator is a special kind of target
(that is how it's described in the spec) and could be modeled either as a
subclass of` `target` or maybe just as a simple boolean `target_option` say
`is_transaction_coordinator`.
##########
cpp/src/node_options.cpp:
##########
@@ -175,6 +176,9 @@ class target_options::impl {
get(dynamic_properties.value, target_map);
value(pn_terminus_properties(unwrap(t))) = target_map;
}
+ if (type.set) {
+ pn_terminus_set_type(unwrap(t), pn_terminus_type_t(type.value));
+ }
Review Comment:
use `is_coordinator`?
##########
cpp/src/proton_bits.hpp:
##########
@@ -111,6 +119,9 @@ template <> struct wrapper<pn_connection_t> { typedef
connection type; };
template <> struct wrapper<pn_session_t> { typedef session type; };
template <> struct wrapper<pn_link_t> { typedef link type; };
template <> struct wrapper<pn_delivery_t> { typedef transfer type; };
+template <> struct wrapper<pn_disposition_t> {
+ typedef disposition type;
+};
Review Comment:
Inconsistent formatting
##########
cpp/src/sender_options.cpp:
##########
@@ -66,6 +66,7 @@ class sender_options::impl {
option<bool> auto_settle;
option<source_options> source;
option<target_options> target;
+ option<coordinator_options> coordinator;
Review Comment:
Not needed if coordinator is a kind of target
##########
cpp/src/sender_options.cpp:
##########
@@ -82,6 +83,10 @@ class sender_options::impl {
proton::target
local_t(make_wrapper<proton::target>(pn_link_target(unwrap(s))));
target.value.apply(local_t);
}
+ if (coordinator.set) {
+ proton::coordinator
local_t(make_wrapper<proton::coordinator>(pn_link_target(unwrap(s))));
+ coordinator.value.apply(local_t);
+ }
Review Comment:
Ditto
##########
cpp/src/transaction.cpp:
##########
@@ -0,0 +1,44 @@
+/*
Review Comment:
Call this file transaction_handler.cpp
##########
cpp/include/proton/target_options.hpp:
##########
@@ -101,6 +103,39 @@ class target_options {
/// @endcond
};
+class coordinator_options {
+ public:
+ /// Create an empty set of options.
+ PN_CPP_EXTERN coordinator_options();
+
+ /// Copy options.
+ PN_CPP_EXTERN coordinator_options(const coordinator_options&);
+
+ PN_CPP_EXTERN ~coordinator_options();
+
+ /// Copy options.
+ PN_CPP_EXTERN coordinator_options& operator=(const coordinator_options&);
+
+ /// Set the address for the coordinator. It is unset by default. The
+ /// address is ignored if dynamic() is true.
+ PN_CPP_EXTERN coordinator_options& address(const std::string& addr);
+
+ /// **Unsettled API** Extension capabilities that are supported/requested
+ PN_CPP_EXTERN coordinator_options& capabilities(const
std::vector<symbol>&);
+
+ private:
+ void apply(coordinator&) const;
+
+ class impl;
+ std::unique_ptr<impl> impl_;
+
+ /// @cond INTERNAL
+ friend class coordinator;
+ friend class sender_options;
+ friend class receiver_options;
+ /// @endcond
+};
Review Comment:
Again, probably an internal subclass of `target` or not needed at all
because it's represented by a simple boolean in target.
##########
cpp/src/sender_options.cpp:
##########
@@ -118,6 +123,7 @@ sender_options&
sender_options::delivery_mode(proton::delivery_mode m) {impl_->d
sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b;
return *this; }
sender_options& sender_options::source(const source_options &s) {impl_->source
= s; return *this; }
sender_options& sender_options::target(const target_options &s) {impl_->target
= s; return *this; }
+sender_options& sender_options::coordinator(const coordinator_options &s)
{impl_->coordinator = s; return *this; }
Review Comment:
Ditto
> [c++] Support for transactions
> ------------------------------
>
> Key: PROTON-1442
> URL: https://issues.apache.org/jira/browse/PROTON-1442
> Project: Qpid Proton
> Issue Type: Improvement
> Components: cpp-binding
> Reporter: Radim Kubis
> Assignee: Rakhi Kumari
> Priority: Major
>
> Support for transactions in Qpid Proton C++.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]