This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push: new 596696a PROTON-2308: Add support for setting Dynamic Node Properties 596696a is described below commit 596696aede355d1e53832a41fbebb59d0b71d5f1 Author: Rakhi Kumari <rakhi.c...@gmail.com> AuthorDate: Wed Dec 8 16:54:01 2021 +0530 PROTON-2308: Add support for setting Dynamic Node Properties [Test fixed up by commiter] Closes #346 --- cpp/include/proton/source_options.hpp | 4 ++ cpp/include/proton/target_options.hpp | 4 ++ cpp/include/proton/terminus.hpp | 5 ++ cpp/src/delivery_test.cpp | 12 ++-- cpp/src/link_test.cpp | 118 +++++++++++++++++++++++++++++++++- cpp/src/node_options.cpp | 23 +++++++ cpp/src/terminus.cpp | 12 +++- 7 files changed, 168 insertions(+), 10 deletions(-) diff --git a/cpp/include/proton/source_options.hpp b/cpp/include/proton/source_options.hpp index fe1c34a..b221d72 100644 --- a/cpp/include/proton/source_options.hpp +++ b/cpp/include/proton/source_options.hpp @@ -27,6 +27,7 @@ #include "./duration.hpp" #include "./source.hpp" +#include <map> #include <string> /// @file @@ -92,6 +93,9 @@ class source_options { /// Extension capabilities that are supported/requested PN_CPP_EXTERN source_options& capabilities(const std::vector<symbol>&); + /// Set the dynamic node properties. + PN_CPP_EXTERN source_options& dynamic_properties(const std::map<symbol, value>&); + private: void apply(source&) const; diff --git a/cpp/include/proton/target_options.hpp b/cpp/include/proton/target_options.hpp index 834a185..1e24051 100644 --- a/cpp/include/proton/target_options.hpp +++ b/cpp/include/proton/target_options.hpp @@ -27,6 +27,7 @@ #include "./duration.hpp" #include "./target.hpp" +#include <map> #include <string> /// @file @@ -83,6 +84,9 @@ class target_options { /// Extension capabilities that are supported/requested PN_CPP_EXTERN target_options& capabilities(const std::vector<symbol>&); + /// Set the dynamic node properties. + PN_CPP_EXTERN target_options& dynamic_properties(const std::map<symbol, value>&); + private: void apply(target&) const; diff --git a/cpp/include/proton/terminus.hpp b/cpp/include/proton/terminus.hpp index 5b9c684..ca10d2c 100644 --- a/cpp/include/proton/terminus.hpp +++ b/cpp/include/proton/terminus.hpp @@ -27,6 +27,7 @@ #include <proton/terminus.h> +#include <map> #include <string> #include <vector> @@ -103,6 +104,10 @@ class terminus { /// Extension capabilities that are supported/requested PN_CPP_EXTERN std::vector<symbol> capabilities() const; + /// Obtain the AMQP dynamic node properties for the + /// terminus as a standard map. + PN_CPP_EXTERN std::map<symbol, value> dynamic_properties() const; + protected: pn_terminus_t *pn_object() const { return object_; } private: diff --git a/cpp/src/delivery_test.cpp b/cpp/src/delivery_test.cpp index af6441c..9765223 100644 --- a/cpp/src/delivery_test.cpp +++ b/cpp/src/delivery_test.cpp @@ -56,7 +56,7 @@ int tracker_settle_counter; proton::binary test_tag("TESTTAG"); } // namespace -class test_recv : public proton::messaging_handler { +class test_server : public proton::messaging_handler { private: class listener_ready_handler : public proton::listen_handler { void on_open(proton::listener &l) override { @@ -74,7 +74,7 @@ class test_recv : public proton::messaging_handler { listener_ready_handler listen_handler; public: - test_recv(const std::string &s) : url(s) {} + test_server (const std::string &s) : url(s) {} void on_container_start(proton::container &c) override { listener = c.listen(url, listen_handler); @@ -88,13 +88,13 @@ class test_recv : public proton::messaging_handler { } }; -class test_send : public proton::messaging_handler { +class test_client : public proton::messaging_handler { private: std::string url; proton::sender sender; public: - test_send(const std::string &s) : url(s) {} + test_client (const std::string &s) : url(s) {} void on_container_start(proton::container &c) override { proton::connection_options co; @@ -125,7 +125,7 @@ int test_delivery_tag() { tracker_settle_counter = 0; std::string recv_address("127.0.0.1:0/test"); - test_recv recv(recv_address); + test_server recv(recv_address); proton::container c(recv); std::thread thread_recv([&c]() -> void { c.run(); }); @@ -135,7 +135,7 @@ int test_delivery_tag() { std::string send_address = "127.0.0.1:" + std::to_string(listener_port) + "/test"; - test_send send(send_address); + test_client send(send_address); proton::container(send).run(); thread_recv.join(); diff --git a/cpp/src/link_test.cpp b/cpp/src/link_test.cpp index 18188c6..cd9d931 100644 --- a/cpp/src/link_test.cpp +++ b/cpp/src/link_test.cpp @@ -20,12 +20,125 @@ #include "test_bits.hpp" -#include <proton/sender_options.hpp> -#include <proton/receiver_options.hpp> +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> #include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/listen_handler.hpp> +#include <proton/listener.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver_options.hpp> +#include <proton/sender_options.hpp> +#include <proton/source_options.hpp> +#include <proton/target_options.hpp> +#include <proton/types.hpp> #include <iostream> +#include <condition_variable> +#include <map> +#include <mutex> +#include <thread> + +typedef std::map<proton::symbol, proton::value> property_map; + +namespace { +std::mutex m; +std::condition_variable cv; +bool listener_ready = false; +int listener_port; +const std::string DYNAMIC_ADDRESS = "test_dynamic_address"; +} // namespace + +class test_server : public proton::messaging_handler { + private: + class listener_ready_handler : public proton::listen_handler { + void on_open(proton::listener& l) override { + { + std::lock_guard<std::mutex> lk(m); + listener_port = l.port(); + listener_ready = true; + } + cv.notify_one(); + } + }; + + std::string url; + proton::listener listener; + listener_ready_handler listen_handler; + + public: + test_server (const std::string& s) : url(s) {} + + void on_container_start(proton::container& c) override { + listener = c.listen(url, listen_handler); + } + + void on_sender_open(proton::sender& s) override { + ASSERT(s.source().dynamic()); + ASSERT(s.source().address().empty()); + property_map p = {{proton::symbol("supported-dist-modes"), proton::symbol("copy")}}; + ASSERT_EQUAL(s.source().dynamic_properties(), p); + + proton::source_options opts; + opts.address(DYNAMIC_ADDRESS) + // This fails due to a bug in the C++ bindings - PROTON-2480 + // .dynamic(true) + .dynamic_properties( + property_map{{proton::symbol("supported-dist-modes"), proton::symbol("move")}} + ); + s.open(proton::sender_options().source(opts)); + listener.stop(); + } +}; + +class test_client : public proton::messaging_handler { + private: + std::string url; + + public: + test_client (const std::string& s) : url(s) {} + + void on_container_start(proton::container& c) override { + proton::source_options opts; + opts.dynamic(true) + .dynamic_properties( + property_map{{proton::symbol{"supported-dist-modes"}, proton::symbol{"copy"}}} + ); + c.open_receiver(url, proton::receiver_options().source(opts)); + } + + void on_receiver_open(proton::receiver& r) override { + // This fails due to a bug in the c++ bindings - PROTON-2480 + // ASSERT(r.source().dynamic()); + ASSERT_EQUAL(DYNAMIC_ADDRESS, r.source().address()); + property_map m({{proton::symbol("supported-dist-modes"), proton::symbol("move")}}); + ASSERT_EQUAL(m, r.source().dynamic_properties()); + + r.connection().close(); + } +}; + +int test_dynamic_node_properties() { + + std::string recv_address("127.0.0.1:0"); + test_server server(recv_address); + proton::container c(server); + std::thread thread_recv([&c]() -> void { c.run(); }); + + // wait until listener is ready + std::unique_lock<std::mutex> lk(m); + cv.wait(lk, [] { return listener_ready; }); + + std::string send_address = "127.0.0.1:" + std::to_string(listener_port); + test_client client(send_address); + proton::container(client).run(); + thread_recv.join(); + + return 0; +} + int test_link_name() { proton::container c; @@ -48,5 +161,6 @@ int test_link_name() int main(int argc, char** argv) { int failed = 0; RUN_ARGV_TEST(failed, test_link_name()); + RUN_ARGV_TEST(failed, test_dynamic_node_properties()); return failed; } diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp index 3b6d197..0e4d8a7 100644 --- a/cpp/src/node_options.cpp +++ b/cpp/src/node_options.cpp @@ -20,14 +20,17 @@ */ #include "proton/codec/vector.hpp" +#include "proton/map.hpp" #include "proton/source.hpp" #include "proton/source_options.hpp" #include "proton/target.hpp" #include "proton/target_options.hpp" +#include "proton/types.hpp" #include "proton_bits.hpp" #include <limits> +#include <map> namespace proton { @@ -97,6 +100,7 @@ class source_options::impl { option<enum source::distribution_mode> distribution_mode; option<source::filter_map> filters; option<std::vector<symbol> > capabilities; + option<std::map<symbol, value>> dynamic_properties; void apply(source& s) { node_address(s, address, dynamic, anonymous); @@ -111,6 +115,11 @@ class source_options::impl { if (capabilities.set) { value(pn_terminus_capabilities(unwrap(s))) = capabilities.value; } + if (dynamic_properties.set) { + map<symbol, value> source_map; + get(dynamic_properties.value, source_map); + value(pn_terminus_properties(unwrap(s))) = source_map; + } } }; @@ -134,6 +143,10 @@ source_options& source_options::expiry_policy(enum source::expiry_policy m) { im source_options& source_options::distribution_mode(enum source::distribution_mode m) { impl_->distribution_mode = m; return *this; } source_options& source_options::filters(const source::filter_map &map) { impl_->filters = map; return *this; } source_options& source_options::capabilities(const std::vector<symbol>& c) { impl_->capabilities = c; return *this; } +source_options& source_options::dynamic_properties(const std::map<symbol, value>& c) { + impl_->dynamic_properties = c; + return *this; +} void source_options::apply(source& s) const { impl_->apply(s); } @@ -148,6 +161,7 @@ class target_options::impl { option<duration> timeout; option<enum target::expiry_policy> expiry_policy; option<std::vector<symbol> > capabilities; + option<std::map<symbol, value>> dynamic_properties; void apply(target& t) { node_address(t, address, dynamic, anonymous); @@ -156,6 +170,11 @@ class target_options::impl { if (capabilities.set) { value(pn_terminus_capabilities(unwrap(t))) = capabilities.value; } + if (dynamic_properties.set) { + map<symbol, value> target_map; + get(dynamic_properties.value, target_map); + value(pn_terminus_properties(unwrap(t))) = target_map; + } } }; @@ -177,6 +196,10 @@ target_options& target_options::durability_mode(enum target::durability_mode m) target_options& target_options::timeout(duration d) { impl_->timeout = d; return *this; } target_options& target_options::expiry_policy(enum target::expiry_policy m) { impl_->expiry_policy = m; return *this; } target_options& target_options::capabilities(const std::vector<symbol>& c) { impl_->capabilities = c; return *this; } +target_options& target_options::dynamic_properties(const std::map<symbol, value>& c) { + impl_->dynamic_properties = c; + return *this; +} void target_options::apply(target& s) const { impl_->apply(s); } diff --git a/cpp/src/terminus.cpp b/cpp/src/terminus.cpp index f04694f..5393bbe 100644 --- a/cpp/src/terminus.cpp +++ b/cpp/src/terminus.cpp @@ -19,10 +19,10 @@ * */ -#include "proton/duration.hpp" #include "proton/terminus.hpp" +#include "proton/duration.hpp" +#include "proton/types.hpp" #include "proton/value.hpp" -#include "proton/codec/vector.hpp" #include "proton_bits.hpp" @@ -61,4 +61,12 @@ std::vector<symbol> terminus::capabilities() const { return caps.empty() ? std::vector<symbol>() : caps.get<std::vector<symbol> >(); } +std::map<symbol, value> terminus::dynamic_properties() const { + value props(pn_terminus_properties(object_)); + std::map<symbol, value> properties; + if (!props.empty()) { + get(props, properties); + } + return properties; +} } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org