Repository: qpid-proton Updated Branches: refs/heads/cjansen-cpp-client e0db6ea98 -> ef29b07c3
PROTON-865: Blocking sender functionality and handler per connection Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ef29b07c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ef29b07c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ef29b07c Branch: refs/heads/cjansen-cpp-client Commit: ef29b07c3bbc11d30d24e34a8dc9d10a7af2d495 Parents: e0db6ea Author: Clifford Jansen <cliffjan...@apache.org> Authored: Wed May 20 07:54:03 2015 -0700 Committer: Clifford Jansen <cliffjan...@apache.org> Committed: Wed May 20 07:54:03 2015 -0700 ---------------------------------------------------------------------- proton-c/bindings/cpp/CMakeLists.txt | 7 + .../cpp/examples/HelloWorldBlocking.cpp | 65 +++++++ .../cpp/include/proton/cpp/BlockingConnection.h | 67 +++++++ .../cpp/include/proton/cpp/BlockingLink.h | 59 +++++++ .../cpp/include/proton/cpp/BlockingSender.h | 54 ++++++ .../cpp/include/proton/cpp/Connection.h | 2 +- .../bindings/cpp/include/proton/cpp/Container.h | 15 +- .../bindings/cpp/include/proton/cpp/Delivery.h | 4 +- .../bindings/cpp/include/proton/cpp/Duration.h | 56 ++++++ proton-c/bindings/cpp/include/proton/cpp/Link.h | 1 + .../cpp/include/proton/cpp/MessagingAdapter.h | 3 - .../cpp/include/proton/cpp/MessagingHandler.h | 6 + .../bindings/cpp/include/proton/cpp/Sender.h | 3 +- .../cpp/include/proton/cpp/WaitCondition.h | 45 +++++ proton-c/bindings/cpp/src/Connection.cpp | 4 +- proton-c/bindings/cpp/src/ConnectionImpl.cpp | 28 ++- proton-c/bindings/cpp/src/ConnectionImpl.h | 3 +- proton-c/bindings/cpp/src/Container.cpp | 25 ++- proton-c/bindings/cpp/src/ContainerImpl.cpp | 175 +++++++++++-------- proton-c/bindings/cpp/src/ContainerImpl.h | 24 ++- proton-c/bindings/cpp/src/Duration.cpp | 55 ++++++ proton-c/bindings/cpp/src/Link.cpp | 4 + proton-c/bindings/cpp/src/MessagingAdapter.cpp | 6 +- proton-c/bindings/cpp/src/MessagingHandler.cpp | 57 +++++- proton-c/bindings/cpp/src/Sender.cpp | 3 +- .../cpp/src/blocking/BlockingConnection.cpp | 62 +++++++ .../cpp/src/blocking/BlockingConnectionImpl.cpp | 124 +++++++++++++ .../cpp/src/blocking/BlockingConnectionImpl.h | 63 +++++++ .../bindings/cpp/src/blocking/BlockingLink.cpp | 86 +++++++++ .../cpp/src/blocking/BlockingSender.cpp | 66 +++++++ 30 files changed, 1060 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index fad26ea..2506203 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -40,6 +40,7 @@ set (qpid-proton-cpp-core src/Terminus.cpp src/Acceptor.cpp src/Url.cpp + src/Duration.cpp src/Message.cpp src/MessagingAdapter.cpp src/MessagingEvent.cpp @@ -55,6 +56,10 @@ set (qpid-proton-cpp-core src/Logger.cpp src/contexts.cpp src/exceptions.cpp + src/blocking/BlockingConnection.cpp + src/blocking/BlockingConnectionImpl.cpp + src/blocking/BlockingLink.cpp + src/blocking/BlockingSender.cpp ) #set_source_files_properties ( @@ -102,6 +107,8 @@ add_executable (SimpleSend examples/SimpleSend.cpp) target_link_libraries (SimpleSend qpid-proton-cpp) add_executable (Broker examples/Broker.cpp) target_link_libraries (Broker qpid-proton-cpp) +add_executable (HelloWorldBlocking examples/HelloWorldBlocking.cpp) +target_link_libraries (HelloWorldBlocking qpid-proton-cpp) install (TARGETS qpid-proton-cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp b/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp new file mode 100644 index 0000000..a3f729c --- /dev/null +++ b/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Container.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/BlockingSender.h" + +#include <iostream> + + +using namespace proton::reactor; + +class HelloWorldBlocking : public MessagingHandler { + private: + std::string server; + std::string address; + public: + + HelloWorldBlocking(const std::string &s, const std::string &addr) : server(s), address(addr) {} + + void onStart(Event &e) { + Connection conn = e.getContainer().connect(server); + e.getContainer().createReceiver(conn, address); + } + + void onMessage(Event &e) { + std::string body = e.getMessage().getBody(); + std::cout << body << std::endl; + e.getConnection().close(); + } + +}; + +int main(int argc, char **argv) { + std::string url("localhost:5672"); + std::string addr("examples"); + BlockingConnection conn = BlockingConnection(url); + BlockingSender sender = conn.createSender(addr); + Message m; + m.setBody("Hello World!"); + sender.send(m); + conn.close(); + + // Temporary hack until blocking receiver available + HelloWorldBlocking hw("localhost:5672", "examples"); + Container(hw).run(); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h new file mode 100644 index 0000000..aa268db --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h @@ -0,0 +1,67 @@ +#ifndef PROTON_CPP_BLOCKINGCONNECTION_H +#define PROTON_CPP_BLOCKINGCONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/Handle.h" +#include "proton/cpp/Endpoint.h" +#include "proton/cpp/Container.h" +#include "proton/cpp/Duration.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/types.h" +#include <string> + +struct pn_connection_t; + +namespace proton { +namespace reactor { + +class Container; +class BlockingConnectionImpl; +class SslDomain; +class BlockingSender; +class WaitCondition; + +class BlockingConnection : public Handle<BlockingConnectionImpl> +{ + public: + PROTON_CPP_EXTERN BlockingConnection(); + PROTON_CPP_EXTERN BlockingConnection(const BlockingConnection& c); + PROTON_CPP_EXTERN BlockingConnection& operator=(const BlockingConnection& c); + PROTON_CPP_EXTERN ~BlockingConnection(); + + PROTON_CPP_EXTERN BlockingConnection(std::string &url, Duration = Duration::FOREVER, + SslDomain *ssld=0, Container *c=0); + PROTON_CPP_EXTERN void close(); + + PROTON_CPP_EXTERN BlockingSender createSender(std::string &address, Handler *h=0); + PROTON_CPP_EXTERN void wait(WaitCondition &condition); + PROTON_CPP_EXTERN void wait(WaitCondition &condition, std::string &msg, Duration timeout=Duration::FOREVER); + PROTON_CPP_EXTERN Duration getTimeout(); + private: + friend class PrivateImplRef<BlockingConnection>; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_BLOCKINGCONNECTION_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h new file mode 100644 index 0000000..7f84ce8 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h @@ -0,0 +1,59 @@ +#ifndef PROTON_CPP_BLOCKINGLINK_H +#define PROTON_CPP_BLOCKINGLINK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/Handle.h" +#include "proton/cpp/Endpoint.h" +#include "proton/cpp/Container.h" +#include "proton/cpp/Duration.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/BlockingConnection.h" +#include "proton/types.h" +#include <string> + +namespace proton { +namespace reactor { + +class BlockingConnection; + +class BlockingLink +{ + public: + PROTON_CPP_EXTERN void close(); + ~BlockingLink(); + protected: + PROTON_CPP_EXTERN BlockingLink(BlockingConnection *c, pn_link_t *l); + PROTON_CPP_EXTERN void waitForClosed(Duration timeout=Duration::SECOND); + private: + BlockingConnection connection; + Link link; + void checkClosed(); + friend class BlockingConnection; + friend class BlockingSender; + friend class BlockingReceiver; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_BLOCKINGLINK_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h new file mode 100644 index 0000000..d4ddeae --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h @@ -0,0 +1,54 @@ +#ifndef PROTON_CPP_BLOCKINGSENDER_H +#define PROTON_CPP_BLOCKINGSENDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/Handle.h" +#include "proton/cpp/Endpoint.h" +#include "proton/cpp/Container.h" +#include "proton/cpp/Duration.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/BlockingLink.h" +#include "proton/types.h" +#include "proton/delivery.h" +#include <string> + +namespace proton { +namespace reactor { + +class BlockingConnection; +class BlockingLink; + +class BlockingSender : public BlockingLink +{ + public: + PROTON_CPP_EXTERN Delivery send(Message &msg); + PROTON_CPP_EXTERN Delivery send(Message &msg, Duration timeout); + private: + PROTON_CPP_EXTERN BlockingSender(BlockingConnection &c, Sender &l); + friend class BlockingConnection; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_BLOCKINGSENDER_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Connection.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Connection.h b/proton-c/bindings/cpp/include/proton/cpp/Connection.h index 86abbe6..f3397ce 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Connection.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Connection.h @@ -47,7 +47,7 @@ class Connection : public Endpoint, public Handle<ConnectionImpl> PROTON_CPP_EXTERN Connection& operator=(const Connection& c); PROTON_CPP_EXTERN ~Connection(); - PROTON_CPP_EXTERN Connection(Container &c); + PROTON_CPP_EXTERN Connection(Container &c, Handler *h = 0); PROTON_CPP_EXTERN Transport &getTransport(); PROTON_CPP_EXTERN Handler *getOverride(); PROTON_CPP_EXTERN void setOverride(Handler *h); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Container.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Container.h b/proton-c/bindings/cpp/include/proton/cpp/Container.h index d596ab1..1d7284d 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Container.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Container.h @@ -24,6 +24,7 @@ #include "proton/cpp/ImportExport.h" #include "proton/cpp/Handle.h" #include "proton/cpp/Acceptor.h" +#include "proton/cpp/Duration.h" #include <proton/reactor.h> #include <string> @@ -39,6 +40,7 @@ class MessagingHandler; class Sender; class Receiver; class Link; + class Handler; class Container : public Handle<ContainerImpl> { @@ -48,17 +50,24 @@ class Container : public Handle<ContainerImpl> PROTON_CPP_EXTERN Container& operator=(const Container& c); PROTON_CPP_EXTERN ~Container(); + PROTON_CPP_EXTERN Container(); PROTON_CPP_EXTERN Container(MessagingHandler &mhandler); - PROTON_CPP_EXTERN Connection connect(std::string &host); + PROTON_CPP_EXTERN Connection connect(std::string &host, Handler *h=0); PROTON_CPP_EXTERN void run(); + PROTON_CPP_EXTERN void start(); + PROTON_CPP_EXTERN bool process(); + PROTON_CPP_EXTERN void stop(); + PROTON_CPP_EXTERN void wakeup(); + PROTON_CPP_EXTERN bool isQuiesced(); PROTON_CPP_EXTERN pn_reactor_t *getReactor(); - PROTON_CPP_EXTERN pn_handler_t *getGlobalHandler(); - PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr); + PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr, Handler *h=0); PROTON_CPP_EXTERN Sender createSender(std::string &url); PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr); PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url); PROTON_CPP_EXTERN Acceptor listen(const std::string &url); PROTON_CPP_EXTERN std::string getContainerId(); + PROTON_CPP_EXTERN Duration getTimeout(); + PROTON_CPP_EXTERN void setTimeout(Duration timeout); private: friend class PrivateImplRef<Container>; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Delivery.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Delivery.h b/proton-c/bindings/cpp/include/proton/cpp/Delivery.h index a1965f6..8171dd5 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Delivery.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Delivery.h @@ -22,9 +22,11 @@ * */ #include "proton/cpp/ImportExport.h" -#include "proton/cpp/Link.h" +#include "proton/cpp/ProtonHandle.h" #include "ProtonImplRef.h" + +#include "proton/delivery.h" #include "proton/disposition.h" namespace proton { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Duration.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Duration.h b/proton-c/bindings/cpp/include/proton/cpp/Duration.h new file mode 100644 index 0000000..4e8f474 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/cpp/Duration.h @@ -0,0 +1,56 @@ +#ifndef PROTON_CPP_DURATION_H +#define PROTON_CPP_DURATION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/ImportExport.h" +#include "proton/types.h" + +namespace proton { +namespace reactor { + +/** \ingroup C++ + * A duration is a time in milliseconds. + */ +class Duration +{ + public: + PROTON_CPP_EXTERN explicit Duration(uint64_t milliseconds); + PROTON_CPP_EXTERN uint64_t getMilliseconds() const; + PROTON_CPP_EXTERN static const Duration FOREVER; + PROTON_CPP_EXTERN static const Duration IMMEDIATE; + PROTON_CPP_EXTERN static const Duration SECOND; + PROTON_CPP_EXTERN static const Duration MINUTE; + private: + uint64_t milliseconds; +}; + +PROTON_CPP_EXTERN Duration operator*(const Duration& duration, + uint64_t multiplier); +PROTON_CPP_EXTERN Duration operator*(uint64_t multiplier, + const Duration& duration); +PROTON_CPP_EXTERN bool operator==(const Duration& a, const Duration& b); +PROTON_CPP_EXTERN bool operator!=(const Duration& a, const Duration& b); + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_DURATION_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Link.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Link.h b/proton-c/bindings/cpp/include/proton/cpp/Link.h index 265d80d..391e5fc 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Link.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Link.h @@ -50,6 +50,7 @@ class Link : public Endpoint, public ProtonHandle<pn_link_t> PROTON_CPP_EXTERN Terminus getTarget(); PROTON_CPP_EXTERN Terminus getRemoteSource(); PROTON_CPP_EXTERN Terminus getRemoteTarget(); + PROTON_CPP_EXTERN std::string getName(); PROTON_CPP_EXTERN pn_link_t *getPnLink() const; virtual PROTON_CPP_EXTERN Connection &getConnection(); PROTON_CPP_EXTERN Link getNext(Endpoint::State mask); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h index 36a92e4..280df5b 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h +++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h @@ -71,9 +71,6 @@ class MessagingAdapter : public MessagingHandler PROTON_CPP_EXTERN virtual void onTransportTailClosed(Event &e); private: MessagingHandler &delegate; // The handler for generated MessagingEvent's - bool autoSettle; - bool autoAccept; - bool peerCloseIsError; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h index 89d582c..b157360 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h +++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h @@ -30,6 +30,7 @@ namespace proton { namespace reactor { class Event; +class MessagingAdapter; class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler , public Acking { @@ -80,9 +81,14 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler , public Acking bool autoSettle; bool autoAccept; bool peerCloseIsError; + MessagingAdapter *messagingAdapter; + Handler *flowController; + PROTON_CPP_EXTERN MessagingHandler(bool rawHandler, int prefetch=10, bool autoAccept=true, bool autoSettle=true, + bool peerCloseIsError=false); private: friend class ContainerImpl; friend class MessagingAdapter; + void createHelpers(); }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Sender.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Sender.h b/proton-c/bindings/cpp/include/proton/cpp/Sender.h index 9b8683d..c63161c 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Sender.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Sender.h @@ -22,6 +22,7 @@ * */ #include "proton/cpp/ImportExport.h" +#include "proton/cpp/Delivery.h" #include "proton/cpp/Link.h" #include "proton/cpp/Message.h" @@ -40,7 +41,7 @@ class Sender : public Link PROTON_CPP_EXTERN Sender(pn_link_t *lnk); PROTON_CPP_EXTERN Sender(); PROTON_CPP_EXTERN Sender(const Link& c); - PROTON_CPP_EXTERN void send(Message &m); + PROTON_CPP_EXTERN Delivery send(Message &m); protected: virtual void verifyType(pn_link_t *l); }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h b/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h new file mode 100644 index 0000000..f4c7cb5 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h @@ -0,0 +1,45 @@ +#ifndef PROTON_CPP_WAITCONDITION_H +#define PROTON_CPP_WAITCONDITION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" + +namespace proton { +namespace reactor { + +// Interface class to indicates that an expected contion has been +// achieved, i.e. for BlockingConnection.wait() + +class WaitCondition +{ + public: + PROTON_CPP_EXTERN virtual ~WaitCondition(); + + // Overide this member function to indicate whether an expected + // condition is achieved and requires no further waiting. + virtual bool achieved() = 0; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_WAITCONDITION_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Connection.cpp b/proton-c/bindings/cpp/src/Connection.cpp index 1db8fbc..67e7d0c 100644 --- a/proton-c/bindings/cpp/src/Connection.cpp +++ b/proton-c/bindings/cpp/src/Connection.cpp @@ -42,8 +42,8 @@ Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::cop Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); } Connection::~Connection() { PI::dtor(*this); } -Connection::Connection(Container &c) { - ConnectionImpl *cimpl = new ConnectionImpl(c); +Connection::Connection(Container &c, Handler *h) { + ConnectionImpl *cimpl = new ConnectionImpl(c, h); PI::ctor(*this, cimpl); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/ConnectionImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.cpp b/proton-c/bindings/cpp/src/ConnectionImpl.cpp index 9cadffe..f7cc5f9 100644 --- a/proton-c/bindings/cpp/src/ConnectionImpl.cpp +++ b/proton-c/bindings/cpp/src/ConnectionImpl.cpp @@ -25,6 +25,8 @@ #include "proton/cpp/Transport.h" #include "Msg.h" #include "contexts.h" +#include "PrivateImplRef.h" +#include "ContainerImpl.h" #include "proton/connection.h" @@ -41,12 +43,25 @@ void ConnectionImpl::decref(ConnectionImpl *impl) { delete impl; } -ConnectionImpl::ConnectionImpl(Container &c, pn_connection_t *pnConn) : container(c), refCount(0), override(0), transport(0), defaultSession(0), - pnConnection(pnConn), - reactorReference(this) +ConnectionImpl::ConnectionImpl(Container &c, pn_connection_t &pnConn) + : container(c), refCount(0), override(0), transport(0), defaultSession(0), + pnConnection(&pnConn), reactorReference(this) { - if (!pnConnection) - pnConnection = pn_reactor_connection(container.getReactor(), NULL); + setConnectionContext(pnConnection, this); +} + +ConnectionImpl::ConnectionImpl(Container &c, Handler *handler) + : container(c), refCount(0), override(0), transport(0), defaultSession(0), + reactorReference(this) +{ + pn_handler_t *chandler = 0; + if (handler) { + ContainerImpl *containerImpl = PrivateImplRef<Container>::get(c); + chandler = containerImpl->wrapHandler(handler); + } + pnConnection = pn_reactor_connection(container.getReactor(), chandler); + if (chandler) + pn_decref(chandler); setConnectionContext(pnConnection, this); } @@ -112,7 +127,7 @@ Connection &ConnectionImpl::getReactorReference(pn_connection_t *conn) { Container container(getContainerContext(reactor)); if (!container) // can't be one created by our container throw ProtonException(MSG("Unknown Proton connection specifier")); - impl = new ConnectionImpl(container, conn); + impl = new ConnectionImpl(container, *conn); } return impl->reactorReference; } @@ -121,5 +136,4 @@ Link ConnectionImpl::getLinkHead(Endpoint::State mask) { return Link(pn_link_head(pnConnection, mask)); } - }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/ConnectionImpl.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.h b/proton-c/bindings/cpp/src/ConnectionImpl.h index 11b5765..48210a3 100644 --- a/proton-c/bindings/cpp/src/ConnectionImpl.h +++ b/proton-c/bindings/cpp/src/ConnectionImpl.h @@ -39,7 +39,8 @@ class Container; class ConnectionImpl : public Endpoint { public: - PROTON_CPP_EXTERN ConnectionImpl(Container &c, pn_connection_t *pnConn = 0); + PROTON_CPP_EXTERN ConnectionImpl(Container &c, pn_connection_t &pnConn); + PROTON_CPP_EXTERN ConnectionImpl(Container &c, Handler *h = 0); PROTON_CPP_EXTERN ~ConnectionImpl(); PROTON_CPP_EXTERN Transport &getTransport(); PROTON_CPP_EXTERN Handler *getOverride(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Container.cpp b/proton-c/bindings/cpp/src/Container.cpp index 4eccb15..e72f484 100644 --- a/proton-c/bindings/cpp/src/Container.cpp +++ b/proton-c/bindings/cpp/src/Container.cpp @@ -53,17 +53,23 @@ Container::Container(MessagingHandler &mhandler) { PI::ctor(*this, cimpl); } -Connection Container::connect(std::string &host) { return impl->connect(host); } +Container::Container() { + ContainerImpl *cimpl = new ContainerImpl(); + PI::ctor(*this, cimpl); +} -pn_reactor_t *Container::getReactor() { return impl->getReactor(); } +Connection Container::connect(std::string &host, Handler *h) { return impl->connect(host, h); } -pn_handler_t *Container::getGlobalHandler() { return impl->getGlobalHandler(); } +pn_reactor_t *Container::getReactor() { return impl->getReactor(); } std::string Container::getContainerId() { return impl->getContainerId(); } +Duration Container::getTimeout() { return impl->getTimeout(); } +void Container::setTimeout(Duration timeout) { impl->setTimeout(timeout); } + -Sender Container::createSender(Connection &connection, std::string &addr) { - return impl->createSender(connection, addr); +Sender Container::createSender(Connection &connection, std::string &addr, Handler *h) { + return impl->createSender(connection, addr, h); } Sender Container::createSender(std::string &urlString) { @@ -83,8 +89,11 @@ Acceptor Container::listen(const std::string &urlString) { } -void Container::run() { - impl->run(); -} +void Container::run() { impl->run(); } +void Container::start() { impl->start(); } +bool Container::process() { return impl->process(); } +void Container::stop() { impl->stop(); } +void Container::wakeup() { impl->wakeup(); } +bool Container::isQuiesced() { return impl->isQuiesced(); } }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/ContainerImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp index 0b75f1c..1424dbb 100644 --- a/proton-c/bindings/cpp/src/ContainerImpl.cpp +++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp @@ -116,41 +116,19 @@ class OverrideHandler : public Handler pn_handler_dispatch(baseHandler, cevent, (pn_event_type_t) type); if (conn && type == PN_CONNECTION_FINAL) { - // TODO: this must be the last acation of the last handler looking at + // TODO: this must be the last action of the last handler looking at // connection events. Better: generate a custom FINAL event (or task). Or move to // separate event streams per connection as part of multi threading support. ConnectionImpl *cimpl = getConnectionContext(conn); if (cimpl) cimpl->reactorDetach(); - // TODO: remember all connections and do reactorDetach of zombies connections - // not pn_connection_release'd at PN_REACTOR_FINAL. + // TODO: remember all connections and do reactorDetach of zombie connections + // not yet pn_connection_release'd at PN_REACTOR_FINAL. } } }; -class CFlowController : public ProtonHandler -{ - public: - pn_handler_t *flowcontroller; - - CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {} - ~CFlowController() { - pn_decref(flowcontroller); - } - - void redirect(Event &e) { - ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e); - pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType()); - } - - virtual void onLinkLocalOpen(Event &e) { redirect(e); } - virtual void onLinkRemoteOpen(Event &e) { redirect(e); } - virtual void onLinkFlow(Event &e) { redirect(e); } - virtual void onDelivery(Event &e) { redirect(e); } -}; - - namespace { // TODO: configurable policy. SessionPerConnection for now. @@ -165,7 +143,6 @@ Session getDefaultSession(pn_connection_t *conn, pn_session_t **ses) { struct InboundContext { ContainerImpl *containerImpl; - Container containerRef; // create only once for all inbound events Handler *cppHandler; }; @@ -174,11 +151,6 @@ ContainerImpl *getContainerImpl(pn_handler_t *c_handler) { return ctxt->containerImpl; } -Container &getContainerRef(pn_handler_t *c_handler) { - struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler); - return ctxt->containerRef; -} - Handler &getCppHandler(pn_handler_t *c_handler) { struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler); return *ctxt->cppHandler; @@ -186,21 +158,19 @@ Handler &getCppHandler(pn_handler_t *c_handler) { void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_type_t type) { - MessagingEvent mevent(cevent, type, getContainerRef(c_handler)); + Container c(getContainerImpl(c_handler)); // Ref counted per event, but when is the last event if stop() never called? + MessagingEvent mevent(cevent, type, c); mevent.dispatch(getCppHandler(c_handler)); } void cpp_handler_cleanup(pn_handler_t *c_handler) { - struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler); - ctxt->containerRef.~Container(); } pn_handler_t *cpp_handler(ContainerImpl *c, Handler *h) { pn_handler_t *handler = pn_handler_new(cpp_handler_dispatch, sizeof(struct InboundContext), cpp_handler_cleanup); struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(handler); - new (&ctxt->containerRef) Container(c); ctxt->containerImpl = c; ctxt->cppHandler = h; return handler; @@ -220,18 +190,29 @@ void ContainerImpl::decref(ContainerImpl *impl) { delete impl; } -ContainerImpl::ContainerImpl(MessagingHandler &mhandler) : - reactor(0), globalHandler(0), messagingHandler(mhandler), containerId(generateUuid()), +ContainerImpl::ContainerImpl(Handler &h) : + reactor(0), handler(&h), messagingAdapter(0), + overrideHandler(0), flowController(0), containerId(generateUuid()), refCount(0) -{ -} +{} + +ContainerImpl::ContainerImpl() : + reactor(0), handler(0), messagingAdapter(0), + overrideHandler(0), flowController(0), containerId(generateUuid()), + refCount(0) +{} -ContainerImpl::~ContainerImpl() {} +ContainerImpl::~ContainerImpl() { + delete overrideHandler; + delete flowController; + delete messagingAdapter; + pn_reactor_free(reactor); +} -Connection ContainerImpl::connect(std::string &host) { - if (!reactor) throw ProtonException(MSG("Container not initialized")); +Connection ContainerImpl::connect(std::string &host, Handler *h) { + if (!reactor) throw ProtonException(MSG("Container not started")); Container cntnr(this); - Connection connection(cntnr); + Connection connection(cntnr, handler); Connector *connector = new Connector(connection); // Connector self-deletes depending on reconnect logic connector->setAddress(host); // TODO: url vector @@ -242,15 +223,36 @@ Connection ContainerImpl::connect(std::string &host) { pn_reactor_t *ContainerImpl::getReactor() { return reactor; } -pn_handler_t *ContainerImpl::getGlobalHandler() { return globalHandler; } std::string ContainerImpl::getContainerId() { return containerId; } +Duration ContainerImpl::getTimeout() { + pn_millis_t tmo = pn_reactor_get_timeout(reactor); + if (tmo == PN_MILLIS_MAX) + return Duration::FOREVER; + return Duration(tmo); +} -Sender ContainerImpl::createSender(Connection &connection, std::string &addr) { +void ContainerImpl::setTimeout(Duration timeout) { + if (timeout == Duration::FOREVER || timeout.getMilliseconds() > PN_MILLIS_MAX) + pn_reactor_set_timeout(reactor, PN_MILLIS_MAX); + else { + pn_millis_t tmo = timeout.getMilliseconds(); + pn_reactor_set_timeout(reactor, tmo); + } +} + + +Sender ContainerImpl::createSender(Connection &connection, std::string &addr, Handler *h) { + if (!reactor) throw ProtonException(MSG("Container not started")); Session session = getDefaultSession(connection.getPnConnection(), &getImpl(connection)->defaultSession); Sender snd = session.createSender(containerId + '-' + addr); - pn_terminus_set_address(pn_link_target(snd.getPnLink()), addr.c_str()); + pn_link_t *lnk = snd.getPnLink(); + pn_terminus_set_address(pn_link_target(lnk), addr.c_str()); + if (h) { + pn_record_t *record = pn_link_attachments(lnk); + pn_record_set_handler(record, wrapHandler(h)); + } snd.open(); ConnectionImpl *connImpl = getImpl(connection); @@ -258,7 +260,8 @@ Sender ContainerImpl::createSender(Connection &connection, std::string &addr) { } Sender ContainerImpl::createSender(std::string &urlString) { - Connection conn = connect(urlString); + if (!reactor) throw ProtonException(MSG("Container not started")); + Connection conn = connect(urlString, 0); Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession); std::string path = Url(urlString).getPath(); Sender snd = session.createSender(containerId + '-' + path); @@ -270,6 +273,7 @@ Sender ContainerImpl::createSender(std::string &urlString) { } Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr) { + if (!reactor) throw ProtonException(MSG("Container not started")); ConnectionImpl *connImpl = getImpl(connection); Session session = getDefaultSession(connImpl->pnConnection, &connImpl->defaultSession); Receiver rcv = session.createReceiver(containerId + '-' + addr); @@ -279,8 +283,9 @@ Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr } Receiver ContainerImpl::createReceiver(const std::string &urlString) { + if (!reactor) throw ProtonException(MSG("Container not started")); // TODO: const cleanup of API - Connection conn = connect(const_cast<std::string &>(urlString)); + Connection conn = connect(const_cast<std::string &>(urlString), 0); Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession); std::string path = Url(urlString).getPath(); Receiver rcv = session.createReceiver(containerId + '-' + path); @@ -298,50 +303,76 @@ Acceptor ContainerImpl::acceptor(const std::string &host, const std::string &por } Acceptor ContainerImpl::listen(const std::string &urlString) { + if (!reactor) throw ProtonException(MSG("Container not started")); Url url(urlString); // TODO: SSL return acceptor(url.getHost(), url.getPort()); } -void ContainerImpl::run() { +pn_handler_t *ContainerImpl::wrapHandler(Handler *h) { + return cpp_handler(this, h); +} + + +void ContainerImpl::initializeReactor() { + if (reactor) throw ProtonException(MSG("Container already running")); reactor = pn_reactor(); // Set our context on the reactor setContainerContext(reactor, this); - int prefetch = messagingHandler.prefetch; - Handler *flowController = 0; - - // Set the reactor's main/default handler (see note below) - if (prefetch) { - flowController = new CFlowController(prefetch); - messagingHandler.addChildHandler(*flowController); + if (handler) { + pn_handler_t *cppHandler = cpp_handler(this, handler); + pn_reactor_set_handler(reactor, cppHandler); + pn_decref(cppHandler); } - MessagingAdapter messagingAdapter(messagingHandler); - messagingHandler.addChildHandler(messagingAdapter); - pn_handler_t *cppHandler = cpp_handler(this, &messagingHandler); - pn_reactor_set_handler(reactor, cppHandler); // Set our own global handler that "subclasses" the existing one - pn_handler_t *cGlobalHandler = pn_reactor_get_global_handler(reactor); - pn_incref(cGlobalHandler); - OverrideHandler overrideHandler(cGlobalHandler); - pn_handler_t *cppGlobalHandler = cpp_handler(this, &overrideHandler); + pn_handler_t *globalHandler = pn_reactor_get_global_handler(reactor); + overrideHandler = new OverrideHandler(globalHandler); + pn_handler_t *cppGlobalHandler = cpp_handler(this, overrideHandler); pn_reactor_set_global_handler(reactor, cppGlobalHandler); + pn_decref(cppGlobalHandler); // Note: we have just set up the following 4/5 handlers that see events in this order: // messagingHandler (Proton C events), pn_flowcontroller (optional), messagingAdapter, - // messagingHandler (Messaging events from the messagingAdapter), connector override, - // the reactor's default globalhandler (pn_iohandler) + // messagingHandler (Messaging events from the messagingAdapter, i.e. the delegate), + // connector override, the reactor's default globalhandler (pn_iohandler) +} + +void ContainerImpl::run() { + initializeReactor(); pn_reactor_run(reactor); +} - pn_decref(cppHandler); - pn_decref(cppGlobalHandler); - pn_decref(cGlobalHandler); - pn_reactor_free(reactor); - reactor = 0; - delete(flowController); +void ContainerImpl::start() { + initializeReactor(); + pn_reactor_start(reactor); +} + +bool ContainerImpl::process() { + if (!reactor) throw ProtonException(MSG("Container not started")); + bool result = pn_reactor_process(reactor); + // TODO: check errors + return result; +} + +void ContainerImpl::stop() { + if (!reactor) throw ProtonException(MSG("Container not started")); + pn_reactor_stop(reactor); + // TODO: check errors +} + +void ContainerImpl::wakeup() { + if (!reactor) throw ProtonException(MSG("Container not started")); + pn_reactor_wakeup(reactor); + // TODO: check errors +} + +bool ContainerImpl::isQuiesced() { + if (!reactor) throw ProtonException(MSG("Container not started")); + return pn_reactor_quiesced(reactor); } }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/ContainerImpl.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.h b/proton-c/bindings/cpp/src/ContainerImpl.h index f7b5b9e..65a6651 100644 --- a/proton-c/bindings/cpp/src/ContainerImpl.h +++ b/proton-c/bindings/cpp/src/ContainerImpl.h @@ -25,6 +25,7 @@ #include "proton/cpp/MessagingHandler.h" #include "proton/cpp/Connection.h" #include "proton/cpp/Link.h" +#include "proton/cpp/Duration.h" #include "proton/reactor.h" @@ -40,26 +41,37 @@ class Acceptor; class ContainerImpl { public: - PROTON_CPP_EXTERN ContainerImpl(MessagingHandler &mhandler); + PROTON_CPP_EXTERN ContainerImpl(Handler &h); + PROTON_CPP_EXTERN ContainerImpl(); PROTON_CPP_EXTERN ~ContainerImpl(); - PROTON_CPP_EXTERN Connection connect(std::string &host); + PROTON_CPP_EXTERN Connection connect(std::string &host, Handler *h); PROTON_CPP_EXTERN void run(); PROTON_CPP_EXTERN pn_reactor_t *getReactor(); - PROTON_CPP_EXTERN pn_handler_t *getGlobalHandler(); - PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr); + PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr, Handler *h); PROTON_CPP_EXTERN Sender createSender(std::string &url); PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr); PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url); PROTON_CPP_EXTERN Acceptor listen(const std::string &url); PROTON_CPP_EXTERN std::string getContainerId(); + PROTON_CPP_EXTERN Duration getTimeout(); + PROTON_CPP_EXTERN void setTimeout(Duration timeout); + void start(); + bool process(); + void stop(); + void wakeup(); + bool isQuiesced(); + pn_handler_t *wrapHandler(Handler *h); static void incref(ContainerImpl *); static void decref(ContainerImpl *); private: void dispatch(pn_event_t *event, pn_event_type_t type); Acceptor acceptor(const std::string &host, const std::string &port); + void initializeReactor(); pn_reactor_t *reactor; - pn_handler_t *globalHandler; - MessagingHandler &messagingHandler; + Handler *handler; + MessagingAdapter *messagingAdapter; + Handler *overrideHandler; + Handler *flowController; std::string containerId; int refCount; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Duration.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Duration.cpp b/proton-c/bindings/cpp/src/Duration.cpp new file mode 100644 index 0000000..dac7899 --- /dev/null +++ b/proton-c/bindings/cpp/src/Duration.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Duration.h" +#include <limits> + +namespace proton { +namespace reactor { + +Duration::Duration(uint64_t ms) : milliseconds(ms) {} +uint64_t Duration::getMilliseconds() const { return milliseconds; } + +Duration operator*(const Duration& duration, uint64_t multiplier) +{ + return Duration(duration.getMilliseconds() * multiplier); +} + +Duration operator*(uint64_t multiplier, const Duration& duration) +{ + return Duration(duration.getMilliseconds() * multiplier); +} + +bool operator==(const Duration& a, const Duration& b) +{ + return a.getMilliseconds() == b.getMilliseconds(); +} + +bool operator!=(const Duration& a, const Duration& b) +{ + return a.getMilliseconds() != b.getMilliseconds(); +} + +const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max()); +const Duration Duration::IMMEDIATE(0); +const Duration Duration::SECOND(1000); +const Duration Duration::MINUTE(SECOND * 60); + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Link.cpp b/proton-c/bindings/cpp/src/Link.cpp index aab01d9..59cf039 100644 --- a/proton-c/bindings/cpp/src/Link.cpp +++ b/proton-c/bindings/cpp/src/Link.cpp @@ -96,6 +96,10 @@ Terminus Link::getRemoteTarget() { return Terminus(pn_link_remote_target(impl), this); } +std::string Link::getName() { + return std::string(pn_link_name(impl)); +} + Connection &Link::getConnection() { pn_session_t *s = pn_link_session(impl); pn_connection_t *c = pn_session_connection(s); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/MessagingAdapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp index f2916db..625485e 100644 --- a/proton-c/bindings/cpp/src/MessagingAdapter.cpp +++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp @@ -32,14 +32,12 @@ namespace proton { namespace reactor { - MessagingAdapter::MessagingAdapter(MessagingHandler &delegate_) : - autoSettle(delegate_.autoSettle), - autoAccept(delegate_.autoAccept), - peerCloseIsError(delegate_.peerCloseIsError), + MessagingHandler(true, delegate_.prefetch, delegate_.autoSettle, delegate_.autoAccept, delegate_.peerCloseIsError), delegate(delegate_) {}; + MessagingAdapter::~MessagingAdapter(){}; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/MessagingHandler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingHandler.cpp b/proton-c/bindings/cpp/src/MessagingHandler.cpp index 7a4a5cb..925186a 100644 --- a/proton-c/bindings/cpp/src/MessagingHandler.cpp +++ b/proton-c/bindings/cpp/src/MessagingHandler.cpp @@ -26,11 +26,64 @@ namespace proton { namespace reactor { +namespace { +class CFlowController : public ProtonHandler +{ + public: + pn_handler_t *flowcontroller; + + CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {} + ~CFlowController() { + pn_decref(flowcontroller); + } + + void redirect(Event &e) { + ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e); + pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType()); + } + + virtual void onLinkLocalOpen(Event &e) { redirect(e); } + virtual void onLinkRemoteOpen(Event &e) { redirect(e); } + virtual void onLinkFlow(Event &e) { redirect(e); } + virtual void onDelivery(Event &e) { redirect(e); } +}; + +} // namespace + + + + MessagingHandler::MessagingHandler(int prefetch0, bool autoAccept0, bool autoSettle0, bool peerCloseIsError0) : prefetch(prefetch0), autoAccept(autoAccept0), autoSettle(autoSettle0), peerCloseIsError(peerCloseIsError0) -{} +{ + createHelpers(); +} + +MessagingHandler::MessagingHandler(bool rawHandler, int prefetch0, bool autoAccept0, bool autoSettle0, + bool peerCloseIsError0) : + prefetch(prefetch0), autoAccept(autoAccept0), autoSettle(autoSettle0), peerCloseIsError(peerCloseIsError0) +{ + if (rawHandler) { + flowController = 0; + messagingAdapter = 0; + } else { + createHelpers(); + } +} + +void MessagingHandler::createHelpers() { + if (prefetch > 0) { + flowController = new CFlowController(prefetch); + addChildHandler(*flowController); + } + messagingAdapter = new MessagingAdapter(*this); + addChildHandler(*messagingAdapter); +} -MessagingHandler::~MessagingHandler(){}; +MessagingHandler::~MessagingHandler(){ + delete flowController; + delete messagingAdapter; +}; void MessagingHandler::onAbort(Event &e) { onUnhandled(e); } void MessagingHandler::onAccepted(Event &e) { onUnhandled(e); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Sender.cpp b/proton-c/bindings/cpp/src/Sender.cpp index c521ad1..c8f962e 100644 --- a/proton-c/bindings/cpp/src/Sender.cpp +++ b/proton-c/bindings/cpp/src/Sender.cpp @@ -54,7 +54,7 @@ namespace{ uint64_t tagCounter = 0; } -void Sender::send(Message &message) { +Delivery Sender::send(Message &message) { char tag[8]; void *ptr = &tag; uint64_t id = ++tagCounter; @@ -67,6 +67,7 @@ void Sender::send(Message &message) { pn_link_advance(link); if (pn_link_snd_settle_mode(link) == PN_SND_SETTLED) pn_delivery_settle(dlv); + return Delivery(dlv); } }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp b/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp new file mode 100644 index 0000000..3fb6010 --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Container.h" +#include "proton/cpp/BlockingConnection.h" +#include "proton/cpp/BlockingSender.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" +#include "BlockingConnectionImpl.h" +#include "PrivateImplRef.h" + +namespace proton { +namespace reactor { + +template class Handle<BlockingConnectionImpl>; +typedef PrivateImplRef<BlockingConnection> PI; + +BlockingConnection::BlockingConnection() {PI::ctor(*this, 0); } + +BlockingConnection::BlockingConnection(const BlockingConnection& c) : Handle<BlockingConnectionImpl>() { PI::copy(*this, c); } + +BlockingConnection& BlockingConnection::operator=(const BlockingConnection& c) { return PI::assign(*this, c); } +BlockingConnection::~BlockingConnection() { PI::dtor(*this); } + +BlockingConnection::BlockingConnection(std::string &url, Duration d, SslDomain *ssld, Container *c) { + BlockingConnectionImpl *cimpl = new BlockingConnectionImpl(url, d,ssld, c); + PI::ctor(*this, cimpl); +} + +void BlockingConnection::close() { impl->close(); } + +void BlockingConnection::wait(WaitCondition &cond) { return impl->wait(cond); } +void BlockingConnection::wait(WaitCondition &cond, std::string &msg, Duration timeout) { + return impl->wait(cond, msg, timeout); +} + +BlockingSender BlockingConnection::createSender(std::string &address, Handler *h) { + Sender sender = impl->container.createSender(impl->connection, address, h); + return BlockingSender(*this, sender); +} + +Duration BlockingConnection::getTimeout() { return impl->getTimeout(); } + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp new file mode 100644 index 0000000..39adb87 --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Container.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/Duration.h" +#include "proton/cpp/exceptions.h" +#include "proton/cpp/WaitCondition.h" +#include "BlockingConnectionImpl.h" +#include "Msg.h" +#include "contexts.h" + +#include "proton/connection.h" + +namespace proton { +namespace reactor { + +WaitCondition::~WaitCondition() {} + + +void BlockingConnectionImpl::incref(BlockingConnectionImpl *impl) { + impl->refCount++; +} + +void BlockingConnectionImpl::decref(BlockingConnectionImpl *impl) { + impl->refCount--; + if (impl->refCount == 0) + delete impl; +} + +namespace { +struct ConnectionOpening : public WaitCondition { + ConnectionOpening(pn_connection_t *c) : pnConnection(c) {} + bool achieved() { return (pn_connection_state(pnConnection) & PN_REMOTE_UNINIT); } + pn_connection_t *pnConnection; +}; + +struct ConnectionClosed : public WaitCondition { + ConnectionClosed(pn_connection_t *c) : pnConnection(c) {} + bool achieved() { return !(pn_connection_state(pnConnection) & PN_REMOTE_ACTIVE); } + pn_connection_t *pnConnection; +}; + +} + + +BlockingConnectionImpl::BlockingConnectionImpl(std::string &u, Duration timeout0, SslDomain *ssld, Container *c) + : url(u), timeout(timeout0), refCount(0) +{ + if (c) + container = *c; + container.start(); + container.setTimeout(timeout); + // Create connection and send the connection events here + connection = container.connect(url, static_cast<Handler *>(this)); + ConnectionOpening cond(connection.getPnConnection()); + wait(cond); +} + +BlockingConnectionImpl::~BlockingConnectionImpl() { + container = Container(); +} + +void BlockingConnectionImpl::close() { + connection.close(); + ConnectionClosed cond(connection.getPnConnection()); + wait(cond); +} + +void BlockingConnectionImpl::wait(WaitCondition &condition) { + std::string empty; + wait(condition, empty, timeout); +} + +void BlockingConnectionImpl::wait(WaitCondition &condition, std::string &msg, Duration waitTimeout) { + if (waitTimeout == Duration::FOREVER) { + while (!condition.achieved()) { + container.process(); + } + } + + pn_reactor_t *reactor = container.getReactor(); + pn_millis_t origTimeout = pn_reactor_get_timeout(reactor); + pn_reactor_set_timeout(reactor, waitTimeout.getMilliseconds()); + try { + pn_timestamp_t now = pn_reactor_mark(reactor); + pn_timestamp_t deadline = now + waitTimeout.getMilliseconds(); + while (!condition.achieved()) { + container.process(); + if (deadline < pn_reactor_mark(reactor)) { + std::string txt = "Connection timed out"; + if (!msg.empty()) + txt += ": " + msg; + // TODO: proper Timeout exception + throw ProtonException(MSG(txt)); + } + } + } catch (...) { + pn_reactor_set_timeout(reactor, origTimeout); + throw; + } + pn_reactor_set_timeout(reactor, origTimeout); +} + + + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h new file mode 100644 index 0000000..5f263ab --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h @@ -0,0 +1,63 @@ +#ifndef PROTON_CPP_CONNECTIONIMPL_H +#define PROTON_CPP_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/Endpoint.h" +#include "proton/cpp/Container.h" +#include "proton/types.h" +#include <string> + +struct pn_connection_t; + +namespace proton { +namespace reactor { + +class Handler; +class Container; +class SslDomain; + + class BlockingConnectionImpl : public MessagingHandler +{ + public: + PROTON_CPP_EXTERN BlockingConnectionImpl(std::string &url, Duration d, SslDomain *ssld, Container *c); + PROTON_CPP_EXTERN ~BlockingConnectionImpl(); + PROTON_CPP_EXTERN void close(); + PROTON_CPP_EXTERN void wait(WaitCondition &condition); + PROTON_CPP_EXTERN void wait(WaitCondition &condition, std::string &msg, Duration timeout); + PROTON_CPP_EXTERN pn_connection_t *getPnBlockingConnection(); + Duration getTimeout() { return timeout; } + static void incref(BlockingConnectionImpl *); + static void decref(BlockingConnectionImpl *); + private: + friend class BlockingConnection; + Container container; + Connection connection; + std::string url; + Duration timeout; + int refCount; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_CONNECTIONIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp b/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp new file mode 100644 index 0000000..5a572ae --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/BlockingLink.h" +#include "proton/cpp/BlockingConnection.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/WaitCondition.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" + + +namespace proton { +namespace reactor { + +namespace { +struct LinkOpened : public WaitCondition { + LinkOpened(pn_link_t *l) : pnLink(l) {} + bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_UNINIT); } + pn_link_t *pnLink; +}; + +struct LinkClosed : public WaitCondition { + LinkClosed(pn_link_t *l) : pnLink(l) {} + bool achieved() { return (pn_link_state(pnLink) & PN_REMOTE_CLOSED); } + pn_link_t *pnLink; +}; + +struct LinkNotOpen : public WaitCondition { + LinkNotOpen(pn_link_t *l) : pnLink(l) {} + bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_ACTIVE); } + pn_link_t *pnLink; +}; + + +} // namespace + + +BlockingLink::BlockingLink(BlockingConnection *c, pn_link_t *pnl) : connection(*c), link(pnl) { + std::string msg = "Opening link " + link.getName(); + LinkOpened linkOpened(link.getPnLink()); + connection.wait(linkOpened, msg); +} + +BlockingLink::~BlockingLink() {} + +void BlockingLink::waitForClosed(Duration timeout) { + std::string msg = "Closing link " + link.getName(); + LinkClosed linkClosed(link.getPnLink()); + connection.wait(linkClosed, msg); + checkClosed(); +} + +void BlockingLink::checkClosed() { + pn_link_t * pnLink = link.getPnLink(); + if (pn_link_state(pnLink) & PN_REMOTE_CLOSED) { + link.close(); + // TODO: LinkDetached exception + throw ProtonException(MSG("Link detached")); + } +} + +void BlockingLink::close() { + link.close(); + std::string msg = "Closing link " + link.getName(); + LinkNotOpen linkNotOpen(link.getPnLink()); + connection.wait(linkNotOpen, msg); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp b/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp new file mode 100644 index 0000000..dc6b9bd --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/BlockingSender.h" +#include "proton/cpp/BlockingConnection.h" +#include "proton/cpp/WaitCondition.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" + + +namespace proton { +namespace reactor { + +namespace { +struct DeliverySettled : public WaitCondition { + DeliverySettled(pn_delivery_t *d) : pnDelivery(d) {} + bool achieved() { return pn_delivery_settled(pnDelivery); } + pn_delivery_t *pnDelivery; +}; + +} // namespace + + +BlockingSender::BlockingSender(BlockingConnection &c, Sender &l) : BlockingLink(&c, l.getPnLink()) { + std::string ta = link.getTarget().getAddress(); + std::string rta = link.getRemoteTarget().getAddress(); + if (ta.empty() || ta.compare(rta) != 0) { + waitForClosed(); + link.close(); + std::string txt = "Failed to open sender " + link.getName() + ", target does not match"; + throw ProtonException(MSG("Container not started")); + } +} + +Delivery BlockingSender::send(Message &msg, Duration timeout) { + Sender snd = link; + Delivery dlv = snd.send(msg); + std::string txt = "Sending on sender " + link.getName(); + DeliverySettled cond(dlv.getPnDelivery()); + connection.wait(cond, txt, timeout); + return dlv; +} + +Delivery BlockingSender::send(Message &msg) { + // Use default timeout + return send(msg, connection.getTimeout()); +} + +}} // namespace proton::reactor --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org