Repository: qpid-proton
Updated Branches:
  refs/heads/master ded03b190 -> f53c7683d


PROTON-1959: [cpp] API additions to simplify reconnect

messaging_handler::on_connection_start() - initial, exactly once event
messaging_handler::on_connection_reconnecting() - disconnected with 
auto-reconnect pending
bool connection::reconnected() - connection has been auto-reconnected

Added connection life-cycle doc to the messaging_handler class API doc.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f53c7683
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f53c7683
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f53c7683

Branch: refs/heads/master
Commit: f53c7683d7e903634b53db30d1f401006b43015a
Parents: 77ddf4b
Author: Alan Conway <acon...@redhat.com>
Authored: Tue Oct 30 15:14:04 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Tue Oct 30 16:37:01 2018 -0400

----------------------------------------------------------------------
 cpp/include/proton/connection.hpp        |  13 +++-
 cpp/include/proton/messaging_handler.hpp | 103 ++++++++++++++----------
 cpp/include/proton/reconnect_options.hpp |   4 +-
 cpp/src/connection.cpp                   |   6 ++
 cpp/src/contexts.cpp                     |   2 +-
 cpp/src/contexts.hpp                     |   1 +
 cpp/src/handler.cpp                      |   6 +-
 cpp/src/messaging_adapter.cpp            |   9 ++-
 cpp/src/proactor_container_impl.cpp      |  74 +++++++++++-------
 cpp/src/proactor_container_impl.hpp      |   4 +-
 cpp/src/reconnect_test.cpp               | 108 ++++++++++++++++++++++----
 11 files changed, 238 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/cpp/include/proton/connection.hpp 
b/cpp/include/proton/connection.hpp
index de9c904..ee90a04 100644
--- a/cpp/include/proton/connection.hpp
+++ b/cpp/include/proton/connection.hpp
@@ -82,14 +82,17 @@ PN_CPP_CLASS_EXTERN connection : public 
internal::object<pn_connection_t>, publi
     PN_CPP_EXTERN std::string user() const;
 
     /// Open the connection.
-    ///
-    /// @see endpoint_lifecycle
+    /// @see messaging_handler
     PN_CPP_EXTERN void open();
 
     /// @copydoc open
     PN_CPP_EXTERN void open(const connection_options&);
 
+    /// Close the connection.
+    /// @see messaging_handler
     PN_CPP_EXTERN void close();
+
+    /// @copydoc close
     PN_CPP_EXTERN void close(const error_condition&);
 
     /// Open a new session.
@@ -172,6 +175,12 @@ PN_CPP_CLASS_EXTERN connection : public 
internal::object<pn_connection_t>, publi
     /// execute code safely in the event-handler thread.
     PN_CPP_EXTERN void wake() const;
 
+    /// **Unsettled API** - true if this connection has been automatically
+    /// re-connected.
+    ///
+    /// @see reconnect_options, messaging_handler
+    PN_CPP_EXTERN bool reconnected() const;
+
     /// @cond INTERNAL
   friend class internal::factory<connection>;
   friend class container;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/include/proton/messaging_handler.hpp
----------------------------------------------------------------------
diff --git a/cpp/include/proton/messaging_handler.hpp 
b/cpp/include/proton/messaging_handler.hpp
index 6eb3755..fbefe8e 100644
--- a/cpp/include/proton/messaging_handler.hpp
+++ b/cpp/include/proton/messaging_handler.hpp
@@ -30,59 +30,65 @@
 
 namespace proton {
 
-/// A handler for Proton messaging events.
+
+/// Handler for Proton messaging events.
 ///
 /// Subclass and override the event-handling member functions.
 ///
-/// Event handling functions can always use the objects passed as
-/// arguments.
+/// **Thread-safety**: A thread-safe handler can use the objects
+/// passed as arguments, and other objects belonging to the same
+/// proton::connection.  It *must not* use objects belonging to a
+/// different connection. See @ref mt_page and proton::work_queue for
+/// safe ways to communicate between connections.  Thread-safe
+/// handlers can also be used in single-threaded code.
+///
+/// **Single-threaded only**: An application is single-threaded if it
+/// calls container::run() exactly once, and only makes make proton
+/// calls from handler functions. Single-threaded handler functions
+/// can use objects belonging to another connection, but *must* call
+/// connection::wake() on the other connection before returning. Such
+/// a handler is not thread-safe.
 ///
-/// @note A handler function **must not** use proton objects that are
-/// not accessible via the arguments passed without taking extra
-/// care. For example an on_message() handler called for connection
-/// "A" cannot simply call sender::send() on a proton::sender
-/// belonging to connection "B".
+/// ### Connection life-cycle and automatic re-connect
 ///
-/// **Thread-safety**: To be safe for both single- and multi-threaded
-/// use, a handler **must not** directly use objects belonging to
-/// another connection. See @ref mt_page and proton::work_queue for
-/// safe ways to communicate. We recommend writing safe handlers to
-/// avoid mysterious failures if the handler is ever used in a
-/// multi-threaded container.
+/// on_connection_start() is the first event for any connection.
 ///
-/// **Single-threaded only**: An application is single-threaded if it
-/// calls container::run() exactly once, and does not make proton
-/// calls from any other thread. In this case a handler can use
-/// objects belonging to another connection, but it must call
-/// connection::wake() on the other connection before returning.  Such
-/// a handler will fail mysteriously if the container is run with
-/// multiple threads.
+/// on_connection_open() means the remote peer has sent an AMQP open.
+/// For a client, this means the connection is fully open.  A server
+/// should respond with connection::open() or reject the request with
+/// connection::close()
 ///
-/// #### Close and error handling
+/// on_connection_reconnecting() may be called if automatic re-connect
+/// is enabled (see reconnect_options).  It is called when the
+/// connection is disconnected and a re-connect will be
+/// attempted. Calling connection::close() will cancel the re-connect.
 ///
-/// There are several objects that have `on_X_close` and `on_X_error`
-/// functions.  They are called as follows:
+/// on_connection_open() will be called again on a successful
+/// re-connect.  Each open @ref session, @ref sender and @ref receiver
+/// will also be automatically re-opened. On success, on_sender_open()
+/// or on_receiver_open() are called, on failure on_sender_error() or
+/// on_receiver_error().
 ///
-/// - If `X` is closed cleanly, with no error status, then `on_X_close`
-///   is called.
+/// on_connection_close() indicates orderly shut-down of the
+/// connection. Servers should respond with connection::close().
+/// on_connection_close() is not called if the connection fails before
+/// the remote end can do an orderly close.
 ///
-/// - If `X` is closed with an error, then `on_X_error` is called,
-///   followed by `on_X_close`. The error condition is also available
-///   in `on_X_close` from `X::error()`.
+/// on_transport_close() is always the final event for a connection, and
+/// is always called regardless of how the connection closed or failed.
 ///
-/// By default, if you do not implement `on_X_error`, it will call
-/// `on_error`.  If you do not implement `on_error` it will throw a
-/// `proton::error` exception, which may not be what you want but
-/// does help to identify forgotten error handling quickly.
+/// If the connection or transport closes with an error, on_connection_error()
+/// or on_transport_error() is called immediately before on_connection_close() 
or
+/// on_transport_close(). You can also check for error conditions in the close
+/// function with connection::error() or transport::error()
 ///
-/// #### Resource cleanup
+/// Note: closing a connection with the special error condition
+/// `amqp:connection-forced`is treated as a disconnect - it triggers
+/// automatic re-connect or on_transport_error()/on_transport_close(),
+/// not on_connection_close().
+///
+/// @see reconnect_options
 ///
-/// Every `on_X_open` event is paired with an `on_X_close` event which
-/// can clean up any resources created by the open handler.  In
-/// particular this is still true if an error is reported with an
-/// `on_X_error` event.  The error-handling logic doesn't have to
-/// manage resource clean up.  It can assume that the close event will
-/// be along to handle it.
 class
 PN_CPP_CLASS_EXTERN messaging_handler {
   public:
@@ -111,15 +117,28 @@ PN_CPP_CLASS_EXTERN messaging_handler {
     PN_CPP_EXTERN virtual void on_transport_open(transport&);
 
     /// The underlying network transport has closed.
+    /// This is the final event for a connection, there will be
+    /// no more events or re-connect attempts.
     PN_CPP_EXTERN virtual void on_transport_close(transport&);
 
-    /// The underlying network transport has closed with an error
-    /// condition.
+    /// The underlying network transport has disconnected unexpectedly.
     PN_CPP_EXTERN virtual void on_transport_error(transport&);
 
+    /// **Unsettled API** - Called before the connection is opened.
+    /// Use for initial setup, e.g. to open senders or receivers.
+    PN_CPP_EXTERN virtual void on_connection_start(connection&);
+
     /// The remote peer opened the connection.
+    /// Called for the initial open, and also after each successful re-connect 
if
+    /// @ref reconnect_options are set.
     PN_CPP_EXTERN virtual void on_connection_open(connection&);
 
+    /// **Unsettled API** - The connection has been disconnected and
+    /// is about to attempt an automatic re-connect.
+    /// If on_connection_reconnecting() calls connection::close() then
+    /// the reconnect attempt will be canceled.
+    PN_CPP_EXTERN virtual void on_connection_reconnecting(connection&);
+
     /// The remote peer closed the connection.
     PN_CPP_EXTERN virtual void on_connection_close(connection&);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/include/proton/reconnect_options.hpp
----------------------------------------------------------------------
diff --git a/cpp/include/proton/reconnect_options.hpp 
b/cpp/include/proton/reconnect_options.hpp
index bc4e43a..2206eec 100644
--- a/cpp/include/proton/reconnect_options.hpp
+++ b/cpp/include/proton/reconnect_options.hpp
@@ -42,10 +42,10 @@ namespace proton {
 /// reconnection attempts.  They may be open-ended or limited in time.
 /// They may be evenly spaced or increasing at an exponential rate.
 ///
-/// Options can be "chained". See @ref proton::connection_options.
-///
 /// Normal value semantics: copy or assign creates a separate copy of
 /// the options.
+///
+/// @see messaging_handler, connection_options::reconnect()
 class reconnect_options {
   public:
     /// Create an empty set of options.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/connection.cpp b/cpp/src/connection.cpp
index 223bb0c..f0219cd 100644
--- a/cpp/src/connection.cpp
+++ b/cpp/src/connection.cpp
@@ -192,4 +192,10 @@ std::vector<symbol> connection::desired_capabilities() 
const {
     return get_multiple<std::vector<symbol> >(caps);
 }
 
+bool connection::reconnected() const {
+    connection_context& cc = connection_context::get(pn_object());
+    reconnect_context* rc = cc.reconnect_context_.get();
+    return (rc && rc->reconnected_);
 }
+
+} // namespace proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp
index bd5e80a..9689de2 100644
--- a/cpp/src/contexts.cpp
+++ b/cpp/src/contexts.cpp
@@ -72,7 +72,7 @@ connection_context::connection_context() :
 {}
 
 reconnect_context::reconnect_context(const reconnect_options& ro) :
-    reconnect_options_(new reconnect_options(ro)), retries_(0), 
current_url_(-1)
+    reconnect_options_(new reconnect_options(ro)), retries_(0), 
current_url_(-1), reconnected_(false)
 {}
 
 listener_context::listener_context() : listen_handler_(0) {}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp
index 4c5d679..4a046a4 100644
--- a/cpp/src/contexts.hpp
+++ b/cpp/src/contexts.hpp
@@ -107,6 +107,7 @@ class reconnect_context {
     duration delay_;
     int retries_;
     int current_url_;
+    bool reconnected_;
 };
 
 class listener_context : public context {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/handler.cpp b/cpp/src/handler.cpp
index 0e693a0..16fc38e 100644
--- a/cpp/src/handler.cpp
+++ b/cpp/src/handler.cpp
@@ -48,6 +48,7 @@ void messaging_handler::on_sendable(sender &) {}
 void messaging_handler::on_transport_close(transport &) {}
 void messaging_handler::on_transport_error(transport &t) { 
on_error(t.error()); }
 void messaging_handler::on_transport_open(transport &) {}
+
 void messaging_handler::on_connection_close(connection &) {}
 void messaging_handler::on_connection_error(connection &c) { 
on_error(c.error()); }
 void messaging_handler::on_connection_open(connection &c) {
@@ -55,6 +56,10 @@ void messaging_handler::on_connection_open(connection &c) {
         pn_connection_open(unwrap(c));
     }
 }
+void messaging_handler::on_connection_reconnecting(connection &) {}
+void messaging_handler::on_connection_start(connection &) {}
+void messaging_handler::on_connection_wake(connection&) {}
+
 void messaging_handler::on_session_close(session &) {}
 void messaging_handler::on_session_error(session &s) { on_error(s.error()); }
 void messaging_handler::on_session_open(session &s) {
@@ -85,7 +90,6 @@ void messaging_handler::on_tracker_settle(tracker &) {}
 void messaging_handler::on_delivery_settle(delivery &) {}
 void messaging_handler::on_sender_drain_start(sender &) {}
 void messaging_handler::on_receiver_drain_finish(receiver &) {}
-void messaging_handler::on_connection_wake(connection&) {}
 
 void messaging_handler::on_error(const error_condition& c) { throw 
proton::error(c.what()); }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp
index e01f29f..91fbc6a 100644
--- a/cpp/src/messaging_adapter.cpp
+++ b/cpp/src/messaging_adapter.cpp
@@ -229,6 +229,13 @@ void on_connection_remote_close(messaging_handler& 
handler, pn_event_t* event) {
     pn_connection_close(conn);
 }
 
+void on_connection_bound(messaging_handler& handler, pn_event_t* event) {
+    connection c(make_wrapper(pn_event_connection(event)));
+    if (!c.reconnected()) {     // Call on_connection_start() on first connect
+        handler.on_connection_start(c);
+    }
+}
+
 void on_connection_remote_open(messaging_handler& handler, pn_event_t* event) {
     // Generate on_transport_open event here until we find a better place
     transport t(make_wrapper(pn_event_transport(event)));
@@ -303,7 +310,7 @@ void messaging_adapter::dispatch(messaging_handler& 
handler, pn_event_t* event)
 
     // Only handle events we are interested in
     switch(type) {
-
+      case PN_CONNECTION_BOUND: on_connection_bound(handler, event); break;
       case PN_CONNECTION_REMOTE_OPEN: on_connection_remote_open(handler, 
event); break;
       case PN_CONNECTION_REMOTE_CLOSE: on_connection_remote_close(handler, 
event); break;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/proactor_container_impl.cpp 
b/cpp/src/proactor_container_impl.cpp
index e0057cd..fe5e087 100644
--- a/cpp/src/proactor_container_impl.cpp
+++ b/cpp/src/proactor_container_impl.cpp
@@ -22,10 +22,12 @@
 
 #include "proton/connect_config.hpp"
 #include "proton/error_condition.hpp"
-#include "proton/listener.hpp"
 #include "proton/listen_handler.hpp"
+#include "proton/listener.hpp"
 #include "proton/reconnect_options.hpp"
+#include "proton/transport.hpp"
 #include "proton/url.hpp"
+
 #include "proton/connection.h"
 #include "proton/listener.h"
 #include "proton/proactor.h"
@@ -294,7 +296,11 @@ void container::impl::reset_reconnect(pn_connection_t* 
pnc) {
     rc->current_url_ = -1;
 }
 
-bool container::impl::setup_reconnect(pn_connection_t* pnc) {
+bool container::impl::can_reconnect(pn_connection_t* pnc) {
+    // Don't reconnect if we are locally closed, the application will
+    // not expect a connection it closed to re-open.
+    if (pn_connection_state(pnc) & PN_LOCAL_CLOSED) return false;
+
     // If container stopping don't try to reconnect
     // - we pretend to have set up a reconnect attempt so
     //   that the proactor disconnect will finish and we will exit
@@ -322,6 +328,15 @@ bool container::impl::setup_reconnect(pn_connection_t* 
pnc) {
         pn_condition_format(condition, "proton:io", "Too many reconnect 
attempts (%d)", rc->retries_);
         return false;
     }
+    return true;
+}
+
+void container::impl::setup_reconnect(pn_connection_t* pnc) {
+    connection_context& cc = connection_context::get(pnc);
+    reconnect_context* rc = cc.reconnect_context_.get();
+    if (!rc) return;
+
+    rc->reconnected_ = true;
 
     // Recover connection from proactor
     pn_proactor_release_connection(pnc);
@@ -333,8 +348,6 @@ bool container::impl::setup_reconnect(pn_connection_t* pnc) 
{
     // now anyway
     schedule(delay, make_work(&container::impl::reconnect, this, pnc));
     ++reconnecting_;
-
-    return true;
 }
 
 returned<connection> container::impl::connect(
@@ -622,10 +635,6 @@ container::impl::dispatch_result 
container::impl::dispatch(pn_event_t* event) {
     case PN_CONNECTION_INIT:
         return ContinueLoop;
 
-    // We've already applied options, so don't need to do it here
-    case PN_CONNECTION_BOUND:
-        return ContinueLoop;
-
     case PN_CONNECTION_REMOTE_OPEN: {
         // This is the only event that we get indicating that the connection 
succeeded so
         // it's the only place to reset the reconnection logic.
@@ -639,15 +648,17 @@ container::impl::dispatch_result 
container::impl::dispatch(pn_event_t* event) {
         pn_connection_t *c = pn_event_connection(event);
         pn_condition_t *cc = pn_connection_remote_condition(c);
 
-        // If we got a close with a condition of amqp:connection:forced then 
don't send
-        // any close/error events now. Just set the error condition on the 
transport and
-        // close the connection - This should act as though a transport error 
occurred
-        if (pn_condition_is_set(cc)
-            && !strcmp(pn_condition_get_name(cc), "amqp:connection:forced")) {
+        // amqp:connection:forced should be treated like a transport
+        // disconnect. Hide the connection error/close events from the
+        // application and generate a PN_TRANSPORT_CLOSE event.
+        if (pn_condition_is_set(cc) &&
+            !strcmp(pn_condition_get_name(cc), "amqp:connection:forced"))
+        {
             pn_transport_t* t = pn_event_transport(event);
             pn_condition_t* tc = pn_transport_condition(t);
             pn_condition_copy(tc, cc);
-            pn_connection_close(c);
+            pn_transport_close_head(t);
+            pn_transport_close_tail(t);
             return ContinueLoop;
         }
         break;
@@ -656,19 +667,35 @@ container::impl::dispatch_result 
container::impl::dispatch(pn_event_t* event) {
         // If reconnect is turned on then handle closed on error here with 
reconnect attempt
         pn_connection_t* c = pn_event_connection(event);
         pn_transport_t* t = pn_event_transport(event);
-        // If we successfully schedule a re-connect then hide the event from
-        // user handlers by returning here.
-        if (pn_condition_is_set(pn_transport_condition(t)) && 
setup_reconnect(c)) return ContinueLoop;
+        if (pn_condition_is_set(pn_transport_condition(t)) && 
can_reconnect(c)) {
+            messaging_handler *mh = get_handler(event);
+            if (mh) {           // Notify handler of pending reconnect
+                connection conn = make_wrapper(c);
+                mh->on_connection_reconnecting(conn);
+            }
+            // on_connection_reconnecting() may have closed the connection, 
check again.
+            if (!(pn_connection_state(c) & PN_LOCAL_CLOSED)) {
+                setup_reconnect(c);
+                return ContinueLoop;
+            }
+        }
         // Otherwise, this connection will be freed by the proactor.
         // Mark its work_queue finished so it won't try to use the freed 
connection.
         connection_context::get(c).work_queue_.impl_.get()->finished();
+        break;
     }
     default:
         break;
     }
 
-    // Figure out the handler for the primary object for event
-    messaging_handler* mh = 0;
+    messaging_handler *mh = get_handler(event);
+    if (mh) messaging_adapter::dispatch(*mh, event);
+    return ContinueLoop;
+}
+
+// Figure out the handler for the primary object for event
+messaging_handler* container::impl::get_handler(pn_event_t *event) {
+    messaging_handler *mh = 0;
 
     // First try for a link (send/receiver) handler
     pn_link_t *link = pn_event_link(event);
@@ -683,14 +710,7 @@ container::impl::dispatch_result 
container::impl::dispatch(pn_event_t* event) {
     if (connection && !mh) mh = get_handler(connection);
 
     // Use container handler if nothing more specific (must be a container 
handler)
-    if (!mh) mh = handler_;
-
-    // If we still have no handler don't do anything!
-    // This is pretty unusual, but possible if we use the default constructor 
for container
-    if (!mh) return ContinueLoop;
-
-    messaging_adapter::dispatch(*mh, event);
-    return ContinueLoop;
+    return mh ? mh : handler_;
 }
 
 void container::impl::thread() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/cpp/src/proactor_container_impl.hpp 
b/cpp/src/proactor_container_impl.hpp
index 912a036..0f0ef60 100644
--- a/cpp/src/proactor_container_impl.hpp
+++ b/cpp/src/proactor_container_impl.hpp
@@ -94,6 +94,7 @@ class container::impl {
     void schedule(duration, work);
     template <class T> static void set_handler(T s, messaging_handler* h);
     template <class T> static messaging_handler* get_handler(T s);
+    messaging_handler* get_handler(pn_event_t *event);
     static work_queue::impl* make_work_queue(container&);
 
   private:
@@ -106,7 +107,8 @@ class container::impl {
     void start_connection(const url& url, pn_connection_t* c);
     void reconnect(pn_connection_t* pnc);
     duration next_delay(reconnect_context& rc);
-    bool setup_reconnect(pn_connection_t* pnc);
+    bool can_reconnect(pn_connection_t* pnc);
+    void setup_reconnect(pn_connection_t* pnc);
     void reset_reconnect(pn_connection_t* pnc);
 
     // Event loop to run in each container thread

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/reconnect_test.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/reconnect_test.cpp b/cpp/src/reconnect_test.cpp
index f29cd53..5a56cf1 100644
--- a/cpp/src/reconnect_test.cpp
+++ b/cpp/src/reconnect_test.cpp
@@ -17,18 +17,19 @@
  * under the License.
  */
 
-
 #include "test_bits.hpp"
-#include "proton/error_condition.hpp"
 #include "proton/connection.hpp"
 #include "proton/connection_options.hpp"
 #include "proton/container.hpp"
 #include "proton/delivery.hpp"
+#include "proton/error_condition.hpp"
+#include "proton/listen_handler.hpp"
+#include "proton/listener.hpp"
 #include "proton/message.hpp"
 #include "proton/messaging_handler.hpp"
-#include "proton/listener.hpp"
-#include "proton/listen_handler.hpp"
 #include "proton/reconnect_options.hpp"
+#include "proton/receiver_options.hpp"
+#include "proton/transport.hpp"
 #include "proton/work_queue.hpp"
 
 #include "proton/internal/pn_unique_ptr.hpp"
@@ -77,7 +78,6 @@ class server_connection_handler : public 
proton::messaging_handler {
     int messages_;
     int expect_;
     bool closing_;
-    bool done_;
     listen_handler listen_handler_;
 
     void close (proton::connection &c) {
@@ -89,7 +89,7 @@ class server_connection_handler : public 
proton::messaging_handler {
 
   public:
     server_connection_handler(proton::container& c, int e, waiter& w)
-        : messages_(0), expect_(e), closing_(false), done_(false), 
listen_handler_(*this, w)
+        : messages_(0), expect_(e), closing_(false), listen_handler_(*this, w)
     {
         listener_ = c.listen("//:0", listen_handler_);
     }
@@ -106,14 +106,19 @@ class server_connection_handler : public 
proton::messaging_handler {
         else c.open();
     }
 
+    void on_receiver_open(proton::receiver &r) PN_CPP_OVERRIDE {
+        // Reduce message noise in PN_TRACE output for debugging.
+        // Only the first message is relevant
+        // Control accepts, accepting the message tells the client to finally 
close.
+        r.open(proton::receiver_options().credit_window(0).auto_accept(false));
+        r.add_credit(1);
+    }
+
     void on_message(proton::delivery & d, proton::message & m) PN_CPP_OVERRIDE 
{
         ++messages_;
         proton::connection c = d.connection();
         if (messages_==expect_) close(c);
-    }
-
-    void on_connection_close(proton::connection & c) PN_CPP_OVERRIDE {
-        done_ = true;
+        else d.accept();
     }
 
     void on_transport_error(proton::transport & ) PN_CPP_OVERRIDE {
@@ -125,7 +130,9 @@ class server_connection_handler : public 
proton::messaging_handler {
 
 class tester : public proton::messaging_handler, public waiter {
   public:
-    tester() : waiter(3), container_(*this, "reconnect_server") {}
+    tester() : waiter(3), container_(*this, "reconnect_client"),
+               start_count(0), open_count(0), reconnecting_count(0),
+               link_open_count(0), transport_error_count(0), 
transport_close_count(0) {}
 
     void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         // Server that fails upon connection
@@ -144,22 +151,53 @@ class tester : public proton::messaging_handler, public 
waiter {
         container_.connect(s1->url(), 
proton::connection_options().reconnect(proton::reconnect_options().failover_urls(urls)));
     }
 
-    void on_connection_open(proton::connection& c) PN_CPP_OVERRIDE {
+    void on_connection_start(proton::connection& c) PN_CPP_OVERRIDE {
+        start_count++;
         c.open_sender("messages");
+        ASSERT(!c.reconnected());
+    }
+
+    void on_connection_open(proton::connection& c) PN_CPP_OVERRIDE {
+        ASSERT(bool(open_count) == c.reconnected());
+        open_count++;
+    }
+
+    void on_connection_reconnecting(proton::connection& c) PN_CPP_OVERRIDE {
+        reconnecting_count++;
+    }
+
+    void on_sender_open(proton::sender &s) PN_CPP_OVERRIDE {
+        ASSERT(bool(link_open_count) == s.connection().reconnected());
+        link_open_count++;
     }
 
     void on_sendable(proton::sender& s) PN_CPP_OVERRIDE {
-        proton::message m;
-        m.body("hello");
-        s.send(m);
+        s.send(proton::message("hello"));
     }
 
     void on_tracker_accept(proton::tracker& d) PN_CPP_OVERRIDE {
         d.connection().close();
     }
 
+    void on_transport_error(proton::transport& t) PN_CPP_OVERRIDE {
+        transport_error_count++;
+    }
+
+    void on_transport_close(proton::transport& t) PN_CPP_OVERRIDE {
+        transport_close_count++;
+    }
+
     void run() {
         container_.run();
+        ASSERT_EQUAL(1, start_count);
+        ASSERT_EQUAL(3, open_count);
+        ASSERT(2 < reconnecting_count);
+        // Last reconnect fails before opening links
+        ASSERT(link_open_count > 1);
+        // All transport errors should have been hidden
+        ASSERT_EQUAL(0, transport_error_count);
+        // One final transport close, not an error
+        ASSERT_EQUAL(1, transport_close_count);
     }
 
   private:
@@ -167,6 +205,7 @@ class tester : public proton::messaging_handler, public 
waiter {
     proton::internal::pn_unique_ptr<server_connection_handler> s2;
     proton::internal::pn_unique_ptr<server_connection_handler> s3;
     proton::container container_;
+    int start_count, open_count, reconnecting_count, link_open_count, 
transport_error_count, transport_close_count;
 };
 
 int test_failover_simple() {
@@ -245,6 +284,44 @@ class authfail_reconnect_tester : public 
proton::messaging_handler, public waite
     bool errored_;
 };
 
+// Verify we can stop reconnecting by calling close() in 
on_connection_reconnecting()
+class test_reconnecting_close : public proton::messaging_handler, public 
waiter {
+  public:
+    test_reconnecting_close() : waiter(1), container_(*this, 
"test_reconnecting_close"),
+                                reconnecting_called(false) {}
+
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
+        s1.reset(new server_connection_handler(c, 0, *this));
+    }
+
+    void ready() PN_CPP_OVERRIDE {
+        container_.connect(s1->url(), 
proton::connection_options().reconnect(proton::reconnect_options()));
+    }
+
+    void on_connection_reconnecting(proton::connection& c) PN_CPP_OVERRIDE {
+        reconnecting_called = true;
+        c.close();                        // Abort reconnection
+    }
+
+    void on_connection_close(proton::connection& c) PN_CPP_OVERRIDE {
+        ASSERT(0);              // Not expecting any clean close
+    }
+
+    void on_transport_error(proton::transport& t) PN_CPP_OVERRIDE {
+        // Expected, don't throw
+    }
+
+    void run() {
+        container_.run();
+    }
+
+  private:
+    proton::container container_;
+    std::string err_;
+    bool reconnecting_called;
+    proton::internal::pn_unique_ptr<server_connection_handler> s1;
+};
+
 int test_auth_fail_reconnect() {
     authfail_reconnect_tester().run();
     return 0;
@@ -255,6 +332,7 @@ int main(int argc, char** argv) {
     RUN_ARGV_TEST(failed, test_failover_simple());
     RUN_ARGV_TEST(failed, test_stop_reconnect());
     RUN_ARGV_TEST(failed, test_auth_fail_reconnect());
+    RUN_ARGV_TEST(failed, test_reconnecting_close().run());
     return failed;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to