PROTON-1344: proactor batch events, rename connection_driver renamed pn_connection_engine as pn_connection_driver.
pn_proactor_wait() returns pn_event_batch_t* rather than individual pn_event_t* to reduce thread-context switching. Added pn_collector_next() for simpler event looping. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/25706a47 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/25706a47 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/25706a47 Branch: refs/heads/master Commit: 25706a47ea8f29c0a53f5fae44195a916173cfbd Parents: 94bc296 Author: Alan Conway <[email protected]> Authored: Wed Nov 16 22:31:00 2016 -0500 Committer: Alan Conway <[email protected]> Committed: Thu Nov 17 11:22:14 2016 -0500 ---------------------------------------------------------------------- examples/c/proactor/broker.c | 21 +- examples/c/proactor/libuv_proactor.c | 323 +++++++++++-------- examples/c/proactor/receive.c | 100 +++--- examples/c/proactor/send.c | 164 +++++----- examples/cpp/README.dox | 2 +- examples/cpp/mt/epoll_container.cpp | 76 ++--- proton-c/CMakeLists.txt | 4 +- proton-c/bindings/cpp/CMakeLists.txt | 2 +- proton-c/bindings/cpp/docs/io.md | 6 +- proton-c/bindings/cpp/docs/main.md | 2 +- .../cpp/include/proton/connection_options.hpp | 4 +- .../cpp/include/proton/io/connection_driver.hpp | 211 ++++++++++++ .../cpp/include/proton/io/connection_engine.hpp | 215 ------------ .../cpp/include/proton/messaging_handler.hpp | 2 +- .../bindings/cpp/include/proton/transport.hpp | 2 +- proton-c/bindings/cpp/src/engine_test.cpp | 12 +- proton-c/bindings/cpp/src/include/contexts.hpp | 2 +- .../bindings/cpp/src/io/connection_driver.cpp | 161 +++++++++ .../bindings/cpp/src/io/connection_engine.cpp | 160 --------- proton-c/bindings/cpp/src/receiver.cpp | 2 +- proton-c/bindings/cpp/src/thread_safe_test.cpp | 10 +- proton-c/docs/api/index.md | 42 ++- proton-c/include/proton/connection.h | 2 +- proton-c/include/proton/connection_driver.h | 243 ++++++++++++++ proton-c/include/proton/connection_engine.h | 313 ------------------ proton-c/include/proton/cproton.i | 3 + proton-c/include/proton/event.h | 56 ++++ proton-c/include/proton/proactor.h | 62 ++-- proton-c/src/core/connection_driver.c | 173 +++++----- proton-c/src/core/connection_engine.c | 163 ---------- proton-c/src/core/event.c | 28 +- proton-c/src/tests/refcount.c | 3 +- qpid-proton-cpp.syms | 48 +-- 33 files changed, 1259 insertions(+), 1358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c index e11a8bd..66381fc 100644 --- a/examples/c/proactor/broker.c +++ b/examples/c/proactor/broker.c @@ -17,7 +17,7 @@ * under the License. */ -#include <proton/connection_engine.h> +#include <proton/connection_driver.h> #include <proton/proactor.h> #include <proton/engine.h> #include <proton/sasl.h> @@ -220,9 +220,11 @@ typedef struct broker_t { const char *container_id; /* AMQP container-id */ size_t threads; pn_millis_t heartbeat; + bool finished; } broker_t; void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) { + memset(b, 0, sizeof(*b)); b->proactor = pn_proactor(); b->listener = NULL; queues_init(&b->queues); @@ -293,8 +295,7 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) { const int WINDOW=10; /* Incoming credit window */ -static bool handle(broker_t* b, pn_event_t* e) { - bool more = true; +static void handle(broker_t* b, pn_event_t* e) { pn_connection_t *c = pn_event_connection(e); switch (pn_event_type(e)) { @@ -398,20 +399,24 @@ static bool handle(broker_t* b, pn_event_t* e) { break; case PN_PROACTOR_INTERRUPT: - more = false; + b->finished = true; break; default: break; } - pn_event_done(e); - return more; } static void broker_thread(void *void_broker) { broker_t *b = (broker_t*)void_broker; - while (handle(b, pn_proactor_wait(b->proactor))) - ; + do { + pn_event_batch_t *events = pn_proactor_wait(b->proactor); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + handle(b, e); + } + pn_proactor_done(b->proactor, events); + } while(!b->finished); } static void usage(const char *arg0) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/libuv_proactor.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c index 8dd2706..35afd5c 100644 --- a/examples/c/proactor/libuv_proactor.c +++ b/examples/c/proactor/libuv_proactor.c @@ -22,7 +22,7 @@ #include <uv.h> #include <proton/condition.h> -#include <proton/connection_engine.h> +#include <proton/connection_driver.h> #include <proton/engine.h> #include <proton/extra.h> #include <proton/message.h> @@ -44,11 +44,15 @@ To provide concurrency the proactor uses a "leader-worker-follower" model, threads take turns at the roles: - - a single "leader" calls libuv functions and runs the uv_loop incrementally. - When there is work it hands over leadership and becomes a "worker" + - a single "leader" calls libuv functions and runs the uv_loop in short bursts + to generate work. When there is work available it gives up leadership and + becomes a "worker" + - "workers" handle events concurrently for distinct connections/listeners - When the work is done they become "followers" - - "followers" wait for the leader to step aside, one takes over as new leader. + They do as much work as they can get, when none is left they become "followers" + + - "followers" wait for the leader to generate work and become workers. + When the leader itself becomes a worker, one of the followers takes over. This model is symmetric: any thread can take on any role based on run-time requirements. It also allows the IO and non-IO work associated with an IO @@ -77,7 +81,7 @@ PN_HANDLE(PN_PROACTOR) PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) -/* common to connection engine and listeners */ +/* common to connection and listener */ typedef struct psocket_t { /* Immutable */ pn_proactor_t *proactor; @@ -118,11 +122,11 @@ static inline const char* fixstr(const char* str) { return str[0] == '\001' ? NULL : str; } -typedef struct pconn { +typedef struct pconnection_t { psocket_t psocket; /* Only used by owner thread */ - pn_connection_engine_t ceng; + pn_connection_driver_t driver; /* Only used by leader */ uv_connect_t connect; @@ -132,7 +136,7 @@ typedef struct pconn { size_t writing; bool reading:1; bool server:1; /* accept, not connect */ -} pconn; +} pconnection_t; struct pn_listener_t { psocket_t psocket; @@ -140,6 +144,7 @@ struct pn_listener_t { /* Only used by owner thread */ pn_condition_t *condition; pn_collector_t *collector; + pn_event_batch_t batch; size_t backlog; }; @@ -153,6 +158,10 @@ struct pn_proactor_t { uv_loop_t loop; uv_async_t async; + /* Owner thread: proactor collector and batch can belong to leader or a worker */ + pn_collector_t *collector; + pn_event_batch_t batch; + /* Protected by lock */ uv_mutex_t lock; queue start_q; @@ -162,11 +171,7 @@ struct pn_proactor_t { size_t count; /* psocket count */ bool inactive:1; bool has_leader:1; - - /* Immutable collectors to hold fixed events */ - pn_collector_t *interrupt_event; - pn_collector_t *timeout_event; - pn_collector_t *inactive_event; + bool batch_working:1; /* batch belongs to a worker. */ }; static bool push_lh(queue *q, psocket_t *ps) { @@ -191,8 +196,8 @@ static psocket_t* pop_lh(queue *q) { return ps; } -static inline pconn *as_pconn(psocket_t* ps) { - return ps->is_conn ? (pconn*)ps : NULL; +static inline pconnection_t *as_pconnection_t(psocket_t* ps) { + return ps->is_conn ? (pconnection_t*)ps : NULL; } static inline pn_listener_t *as_listener(psocket_t* ps) { @@ -213,9 +218,9 @@ static void to_leader(psocket_t *ps) { /* Detach from IO and put ps on the worker queue */ static void leader_to_worker(psocket_t *ps) { - pconn *pc = as_pconn(ps); + pconnection_t *pc = as_pconnection_t(ps); /* Don't detach if there are no events yet. */ - if (pc && pn_connection_engine_has_event(&pc->ceng)) { + if (pc && pn_connection_driver_has_event(&pc->driver)) { if (pc->writing) { pc->writing = 0; uv_cancel((uv_req_t*)&pc->write); @@ -236,6 +241,28 @@ static void leader_to_worker(psocket_t *ps) { uv_mutex_unlock(&ps->proactor->lock); } +/* Set a deferred action for leader, if not already set. */ +static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) { + uv_mutex_lock(&ps->proactor->lock); + if (!ps->action) { + ps->action = action; + } + to_leader_lh(ps); + uv_mutex_unlock(&ps->proactor->lock); +} + +/* Owner thread send to worker thread. Set deferred action if not already set. */ +static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) { + uv_mutex_lock(&ps->proactor->lock); + if (!ps->action) { + ps->action = action; + } + push_lh(&ps->proactor->worker_q, ps); + uv_async_send(&ps->proactor->async); /* Wake leader */ + uv_mutex_unlock(&ps->proactor->lock); +} + + /* Re-queue for further work */ static void worker_requeue(psocket_t* ps) { uv_mutex_lock(&ps->proactor->lock); @@ -244,25 +271,43 @@ static void worker_requeue(psocket_t* ps) { uv_mutex_unlock(&ps->proactor->lock); } -static pconn *new_pconn(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) { - pconn *pc = (pconn*)calloc(1, sizeof(*pc)); +static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) { + pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc)); if (!pc) return NULL; - if (pn_connection_engine_init(&pc->ceng, pn_connection_with_extra(extra.size), NULL) != 0) { + if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) { return NULL; } if (extra.start && extra.size) { - memcpy(pn_connection_get_extra(pc->ceng.connection).start, extra.start, extra.size); + memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size); } psocket_init(&pc->psocket, p, true, host, port); if (server) { - pn_transport_set_server(pc->ceng.transport); + pn_transport_set_server(pc->driver.transport); } - pn_record_t *r = pn_connection_attachments(pc->ceng.connection); + pn_record_t *r = pn_connection_attachments(pc->driver.connection); pn_record_def(r, PN_PROACTOR, PN_VOID); pn_record_set(r, PN_PROACTOR, pc); return pc; } +static pn_event_t *listener_batch_next(pn_event_batch_t *batch); +static pn_event_t *proactor_batch_next(pn_event_batch_t *batch); + +static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) { + return (batch->next_event == proactor_batch_next) ? + (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL; +} + +static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) { + return (batch->next_event == listener_batch_next) ? + (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL; +} + +static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) { + pn_connection_driver_t *d = pn_event_batch_connection_driver(batch); + return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL; +} + pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) { pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size)); if (!l) { @@ -278,6 +323,7 @@ pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port } psocket_init(&l->psocket, p, false, host, port); l->condition = pn_condition(); + l->batch.next_event = listener_batch_next; l->backlog = backlog; return l; } @@ -290,11 +336,12 @@ static void leader_count(pn_proactor_t *p, int change) { } /* Free if there are no uv callbacks pending and no events */ -static void leader_pconn_maybe_free(pconn *pc) { - if (pn_connection_engine_has_event(&pc->ceng)) { +static void leader_pconnection_t_maybe_free(pconnection_t *pc) { + if (pn_connection_driver_has_event(&pc->driver)) { leader_to_worker(&pc->psocket); /* Return to worker */ - } else if (!(pc->psocket.tcp.data || pc->shutdown.data || pc->timer.data)) { - pn_connection_engine_destroy(&pc->ceng); + } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) { + /* All UV requests are finished */ + pn_connection_driver_destroy(&pc->driver); leader_count(pc->psocket.proactor, -1); free(pc); } @@ -314,7 +361,7 @@ static void leader_listener_maybe_free(pn_listener_t *l) { /* Free if there are no uv callbacks pending and no events */ static void leader_maybe_free(psocket_t *ps) { if (ps->is_conn) { - leader_pconn_maybe_free(as_pconn(ps)); + leader_pconnection_t_maybe_free(as_pconnection_t(ps)); } else { leader_listener_maybe_free(as_listener(ps)); } @@ -336,9 +383,9 @@ static inline void leader_close(psocket_t *ps) { if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) { uv_close((uv_handle_t*)&ps->tcp, on_close); } - pconn *pc = as_pconn(ps); + pconnection_t *pc = as_pconnection_t(ps); if (pc) { - pn_connection_engine_close(&pc->ceng); + pn_connection_driver_close(&pc->driver); if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { uv_timer_stop(&pc->timer); uv_close((uv_handle_t*)&pc->timer, on_close); @@ -347,20 +394,20 @@ static inline void leader_close(psocket_t *ps) { leader_maybe_free(ps); } -static pconn *get_pconn(pn_connection_t* c) { +static pconnection_t *get_pconnection_t(pn_connection_t* c) { if (!c) return NULL; pn_record_t *r = pn_connection_attachments(c); - return (pconn*) pn_record_get(r, PN_PROACTOR); + return (pconnection_t*) pn_record_get(r, PN_PROACTOR); } static void leader_error(psocket_t *ps, int err, const char* what) { if (ps->is_conn) { - pn_connection_engine_t *ceng = &as_pconn(ps)->ceng; - pn_connection_engine_errorf(ceng, COND_NAME, "%s %s:%s: %s", + pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver; + pn_connection_driver_bind(driver); /* Bind so errors will be reported */ + pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s", what, fixstr(ps->host), fixstr(ps->port), uv_strerror(err)); - pn_connection_engine_bind(ceng); - pn_connection_engine_close(ceng); + pn_connection_driver_close(driver); } else { pn_listener_t *l = as_listener(ps); pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s", @@ -376,9 +423,9 @@ static int leader_init(psocket_t *ps) { leader_count(ps->proactor, +1); int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp); if (!err) { - pconn *pc = as_pconn(ps); + pconnection_t *pc = as_pconnection_t(ps); if (pc) { - pc->connect.data = pc->write.data = pc->shutdown.data = ps; + pc->connect.data = ps; int err = uv_timer_init(&ps->proactor->loop, &pc->timer); if (!err) { pc->timer.data = pc; @@ -392,7 +439,7 @@ static int leader_init(psocket_t *ps) { } /* Common logic for on_connect and on_accept */ -static void leader_connect_accept(pconn *pc, int err, const char *what) { +static void leader_connect_accept(pconnection_t *pc, int err, const char *what) { if (!err) { leader_to_worker(&pc->psocket); } else { @@ -401,14 +448,14 @@ static void leader_connect_accept(pconn *pc, int err, const char *what) { } static void on_connect(uv_connect_t *connect, int err) { - leader_connect_accept((pconn*)connect->data, err, "on connect"); + leader_connect_accept((pconnection_t*)connect->data, err, "on connect"); } static void on_accept(uv_stream_t* server, int err) { pn_listener_t* l = (pn_listener_t*)server->data; if (!err) { pn_rwbytes_t v = pn_listener_get_extra(l); - pconn *pc = new_pconn(l->psocket.proactor, true, + pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true, fixstr(l->psocket.host), fixstr(l->psocket.port), pn_bytes(v.size, v.start)); @@ -436,7 +483,7 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) { } static void leader_connect(psocket_t *ps) { - pconn *pc = as_pconn(ps); + pconnection_t *pc = as_pconnection_t(ps); uv_getaddrinfo_t info; int err = leader_resolve(ps, &info, false); if (!err) { @@ -450,7 +497,7 @@ static void leader_connect(psocket_t *ps) { static void leader_listen(psocket_t *ps) { pn_listener_t *l = as_listener(ps); - uv_getaddrinfo_t info; + uv_getaddrinfo_t info; int err = leader_resolve(ps, &info, true); if (!err) { err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0); @@ -463,8 +510,8 @@ static void leader_listen(psocket_t *ps) { } static void on_tick(uv_timer_t *timer) { - pconn *pc = (pconn*)timer->data; - pn_transport_t *t = pc->ceng.transport; + pconnection_t *pc = (pconnection_t*)timer->data; + pn_transport_t *t = pc->driver.transport; if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) { uv_timer_stop(&pc->timer); uint64_t now = uv_now(pc->timer.loop); @@ -476,24 +523,25 @@ static void on_tick(uv_timer_t *timer) { } static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - pconn *pc = (pconn*)stream->data; + pconnection_t *pc = (pconnection_t*)stream->data; if (nread >= 0) { - pn_connection_engine_read_done(&pc->ceng, nread); + pn_connection_driver_read_done(&pc->driver, nread); on_tick(&pc->timer); /* check for tick changes. */ leader_to_worker(&pc->psocket); /* Reading continues automatically until stopped. */ } else if (nread == UV_EOF) { /* hangup */ - pn_connection_engine_read_close(&pc->ceng); + pn_connection_driver_read_close(&pc->driver); leader_maybe_free(&pc->psocket); } else { leader_error(&pc->psocket, nread, "on read from"); } } -static void on_write(uv_write_t* request, int err) { - pconn *pc = (pconn*)request->data; +static void on_write(uv_write_t* write, int err) { + pconnection_t *pc = (pconnection_t*)write->data; + write->data = NULL; if (err == 0) { - pn_connection_engine_write_done(&pc->ceng, pc->writing); + pn_connection_driver_write_done(&pc->driver, pc->writing); leader_to_worker(&pc->psocket); } else if (err == UV_ECANCELED) { leader_maybe_free(&pc->psocket); @@ -505,29 +553,31 @@ static void on_write(uv_write_t* request, int err) { // Read buffer allocation function for uv, just returns the transports read buffer. static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) { - pconn *pc = (pconn*)stream->data; - pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng); + pconnection_t *pc = (pconnection_t*)stream->data; + pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); *buf = uv_buf_init(rbuf.start, rbuf.size); } static void leader_rewatch(psocket_t *ps) { - pconn *pc = as_pconn(ps); + pconnection_t *pc = as_pconnection_t(ps); if (pc->timer.data) { /* uv-initialized */ on_tick(&pc->timer); /* Re-enable ticks if required */ } - pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&pc->ceng); - pn_bytes_t wbuf = pn_connection_engine_write_buffer(&pc->ceng); + pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); + pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); /* Ticks and checking buffers can generate events, process before proceeding */ - if (pn_connection_engine_has_event(&pc->ceng)) { + if (pn_connection_driver_has_event(&pc->driver)) { leader_to_worker(ps); } else { /* Re-watch for IO */ if (wbuf.size > 0 && !pc->writing) { pc->writing = wbuf.size; uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); + pc->write.data = ps; uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); - } else if (wbuf.size == 0 && pn_connection_engine_write_closed(&pc->ceng)) { + } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { + pc->shutdown.data = ps; uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown); } if (rbuf.size > 0 && !pc->reading) { @@ -537,23 +587,31 @@ static void leader_rewatch(psocket_t *ps) { } } -/* Return the next worker event or { 0 } if no events are ready */ -static pn_event_t* get_event_lh(pn_proactor_t *p) { - if (p->inactive) { - p->inactive = false; - return pn_collector_peek(p->inactive_event); - } - if (p->interrupt > 0) { - --p->interrupt; - return pn_collector_peek(p->interrupt_event); +static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) { + pn_collector_put(p->collector, pn_proactor__class(), p, t); + p->batch_working = true; + return &p->batch; +} + +/* Return the next event batch or 0 if no events are ready */ +static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) { + if (!p->batch_working) { /* Can generate proactor events */ + if (p->inactive) { + p->inactive = false; + return proactor_batch_lh(p, PN_PROACTOR_INACTIVE); + } + if (p->interrupt > 0) { + --p->interrupt; + return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT); + } } for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) { if (ps->is_conn) { - pconn *pc = as_pconn(ps); - return pn_connection_engine_event(&pc->ceng); + pconnection_t *pc = as_pconnection_t(ps); + return &pc->driver.batch; } else { /* Listener */ pn_listener_t *l = as_listener(ps); - return pn_collector_peek(l->collector); + return &l->batch; } to_leader(ps); /* No event, back to leader */ } @@ -568,15 +626,6 @@ static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) { uv_mutex_unlock(&ps->proactor->lock); } -/* Defer an action to the leader thread. Only from non-leader threads. */ -static void owner_defer(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - assert(!ps->action); - ps->action = action; - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); -} - pn_listener_t *pn_event_listener(pn_event_t *e) { return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL; } @@ -590,57 +639,47 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) { return NULL; } -void pn_event_done(pn_event_t *e) { - pn_event_type_t etype = pn_event_type(e); - pconn *pc = get_pconn(pn_event_connection(e)); - if (pc && e == pn_collector_peek(pc->ceng.collector)) { - pn_connection_engine_pop_event(&pc->ceng); - if (etype == PN_CONNECTION_INIT) { - /* Bind after user has handled CONNECTION_INIT */ - pn_connection_engine_bind(&pc->ceng); - } - if (pn_connection_engine_has_event(&pc->ceng)) { - /* Process all events before going back to IO. - Put it back on the worker queue and wake the leader. - */ +void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { + pconnection_t *pc = batch_pconnection(batch); + if (pc) { + if (pn_connection_driver_has_event(&pc->driver)) { + /* Process all events before going back to IO. */ worker_requeue(&pc->psocket); - } else if (pn_connection_engine_finished(&pc->ceng)) { - owner_defer(&pc->psocket, leader_close); + } else if (pn_connection_driver_finished(&pc->driver)) { + owner_to_leader(&pc->psocket, leader_close); } else { - owner_defer(&pc->psocket, leader_rewatch); - } - } else { - pn_listener_t *l = pn_event_listener(e); - if (l && e == pn_collector_peek(l->collector)) { - pn_collector_pop(l->collector); - if (etype == PN_LISTENER_CLOSE) { - owner_defer(&l->psocket, leader_close); - } + owner_to_leader(&pc->psocket, leader_rewatch); } + return; + } + pn_proactor_t *bp = batch_proactor(batch); + if (bp == p) { + uv_mutex_lock(&p->lock); + p->batch_working = false; + uv_async_send(&p->async); /* Wake leader */ + uv_mutex_unlock(&p->lock); + return; } + /* Nothing extra to do for listener, it is always in the UV loop. */ } /* Run follower/leader loop till we can return an event and be a worker */ -pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) { +pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { uv_mutex_lock(&p->lock); /* Try to grab work immediately. */ - pn_event_t *e = get_event_lh(p); - if (e == NULL) { + pn_event_batch_t *batch = get_batch_lh(p); + if (batch == NULL) { /* No work available, follow the leader */ - while (p->has_leader) + while (p->has_leader) { uv_cond_wait(&p->cond, &p->lock); + } /* Lead till there is work to do. */ p->has_leader = true; - for (e = get_event_lh(p); e == NULL; e = get_event_lh(p)) { - /* Run uv_loop outside the lock */ - uv_mutex_unlock(&p->lock); - uv_run(&p->loop, UV_RUN_ONCE); - uv_mutex_lock(&p->lock); - /* Process leader work queue outside the lock */ + while (batch == NULL) { for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { void (*action)(psocket_t*) = ps->action; - ps->action = NULL; void (*wakeup)(psocket_t*) = ps->wakeup; + ps->action = NULL; ps->wakeup = NULL; if (action || wakeup) { uv_mutex_unlock(&p->lock); @@ -649,13 +688,19 @@ pn_event_t *pn_proactor_wait(struct pn_proactor_t* p) { uv_mutex_lock(&p->lock); } } + batch = get_batch_lh(p); + if (batch == NULL) { + uv_mutex_unlock(&p->lock); + uv_run(&p->loop, UV_RUN_ONCE); + uv_mutex_lock(&p->lock); + } } /* Signal the next leader and return to work */ p->has_leader = false; uv_cond_signal(&p->cond); } uv_mutex_unlock(&p->lock); - return e; + return batch; } void pn_proactor_interrupt(pn_proactor_t *p) { @@ -666,11 +711,12 @@ void pn_proactor_interrupt(pn_proactor_t *p) { } int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) { - pconn *pc = new_pconn(p, false, host, port, extra); + pconnection_t *pc = new_pconnection_t(p, false, host, port, extra); if (!pc) { return PN_OUT_OF_MEMORY; } - owner_defer(&pc->psocket, leader_connect); /* Process PN_CONNECTION_INIT before binding */ + /* Process PN_CONNECTION_INIT before binding */ + owner_to_worker(&pc->psocket, leader_connect); return 0; } @@ -678,12 +724,12 @@ pn_rwbytes_t pn_listener_get_extra(pn_listener_t *l) { return PN_EXTRA_GET(pn_li pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) { pn_listener_t *l = new_listener(p, host, port, backlog, extra); - if (l) owner_defer(&l->psocket, leader_listen); + if (l) owner_to_leader(&l->psocket, leader_listen); return l; } pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { - pconn *pc = get_pconn(c); + pconnection_t *pc = get_pconnection_t(c); return pc ? pc->psocket.proactor : NULL; } @@ -692,13 +738,14 @@ pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { } void leader_wake_connection(psocket_t *ps) { - pconn *pc = as_pconn(ps); - pn_collector_put(pc->ceng.collector, PN_OBJECT, pc->ceng.connection, PN_CONNECTION_WAKE); + pconnection_t *pc = as_pconnection_t(ps); + pn_connection_t *c = pc->driver.connection; + pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE); leader_to_worker(ps); } void pn_connection_wake(pn_connection_t* c) { - wakeup(&get_pconn(c)->psocket, leader_wake_connection); + wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection); } void pn_listener_close(pn_listener_t* l) { @@ -710,22 +757,15 @@ pn_condition_t* pn_listener_condition(pn_listener_t* l) { return l->condition; } -/* Collector to hold for a single fixed event that is never popped. */ -static pn_collector_t *event_holder(pn_proactor_t *p, pn_event_type_t t) { - pn_collector_t *c = pn_collector(); - pn_collector_put(c, pn_proactor__class(), p, t); - return c; -} - pn_proactor_t *pn_proactor() { pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); + p->collector = pn_collector(); + p->batch.next_event = &proactor_batch_next; + if (!p->collector) return NULL; uv_loop_init(&p->loop); uv_mutex_init(&p->lock); uv_cond_init(&p->cond); uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */ - p->interrupt_event = event_holder(p, PN_PROACTOR_INTERRUPT); - p->inactive_event = event_holder(p, PN_PROACTOR_INACTIVE); - p->timeout_event = event_holder(p, PN_PROACTOR_TIMEOUT); return p; } @@ -741,8 +781,19 @@ void pn_proactor_free(pn_proactor_t *p) { uv_loop_close(&p->loop); uv_mutex_destroy(&p->lock); uv_cond_destroy(&p->cond); - pn_collector_free(p->interrupt_event); - pn_collector_free(p->inactive_event); - pn_collector_free(p->timeout_event); + pn_collector_free(p->collector); free(p); } + +static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { + pn_listener_t *l = batch_listener(batch); + pn_event_t *handled = pn_collector_prev(l->collector); + if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) { + owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */ + } + return pn_collector_next(l->collector); +} + +static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { + return pn_collector_next(batch_proactor(batch)->collector); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c index acdae0c..88e3456 100644 --- a/examples/c/proactor/receive.c +++ b/examples/c/proactor/receive.c @@ -20,7 +20,7 @@ */ #include <proton/connection.h> -#include <proton/connection_engine.h> +#include <proton/connection_driver.h> #include <proton/delivery.h> #include <proton/proactor.h> #include <proton/link.h> @@ -37,12 +37,13 @@ typedef char str[1024]; typedef struct app_data_t { - str address; - str container_id; - pn_rwbytes_t message_buffer; - int message_count; - int received; - pn_proactor_t *proactor; + str address; + str container_id; + pn_rwbytes_t message_buffer; + int message_count; + int received; + pn_proactor_t *proactor; + bool finished; } app_data_t; static const int BATCH = 100; /* Batch size for unlimited receive */ @@ -80,9 +81,7 @@ static void decode_message(pn_delivery_t *dlv) { } } -/* Handle event, return true of we should handle more */ -static bool handle(app_data_t* app, pn_event_t* event) { - bool more = true; +static void handle(app_data_t* app, pn_event_t* event) { switch (pn_event_type(event)) { case PN_CONNECTION_INIT: { @@ -149,53 +148,58 @@ static bool handle(app_data_t* app, pn_event_t* event) { break; case PN_PROACTOR_INACTIVE: - more = false; + app->finished = true; break; default: break; } - pn_event_done(event); - return more; } static void usage(const char *arg0) { - fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0); - exit(1); + fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0); + exit(1); } int main(int argc, char **argv) { - /* Default values for application and connection. */ - app_data_t app = {{0}}; - app.message_count = 100; - const char* urlstr = NULL; - - int opt; - while((opt = getopt(argc, argv, "a:m:")) != -1) { - switch(opt) { - case 'a': urlstr = optarg; break; - case 'm': app.message_count = atoi(optarg); break; - default: usage(argv[0]); break; - } + /* Default values for application and connection. */ + app_data_t app = {{0}}; + app.message_count = 100; + const char* urlstr = NULL; + + int opt; + while((opt = getopt(argc, argv, "a:m:")) != -1) { + switch(opt) { + case 'a': urlstr = optarg; break; + case 'm': app.message_count = atoi(optarg); break; + default: usage(argv[0]); break; } - if (optind < argc) - usage(argv[0]); - - snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid()); - - /* Parse the URL or use default values */ - pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL; - const char *host = url ? pn_url_get_host(url) : NULL; - const char *port = url ? pn_url_get_port(url) : "amqp"; - strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address)); - - /* Create the proactor and connect */ - app.proactor = pn_proactor(); - pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null); - if (url) pn_url_free(url); - - while (handle(&app, pn_proactor_wait(app.proactor))) - ; - pn_proactor_free(app.proactor); - free(app.message_buffer.start); - return exit_code; + } + if (optind < argc) + usage(argv[0]); + + snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid()); + + /* Parse the URL or use default values */ + pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL; + const char *host = url ? pn_url_get_host(url) : NULL; + const char *port = url ? pn_url_get_port(url) : "amqp"; + strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address)); + + /* Create the proactor and connect */ + app.proactor = pn_proactor(); + pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null); + if (url) pn_url_free(url); + + do { + pn_event_batch_t *events = pn_proactor_wait(app.proactor); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + handle(&app, e); + } + pn_proactor_done(app.proactor, events); + } while(!app.finished); + + pn_proactor_free(app.proactor); + free(app.message_buffer.start); + return exit_code; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/c/proactor/send.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c index 5d58895..d64ea2d 100644 --- a/examples/c/proactor/send.c +++ b/examples/c/proactor/send.c @@ -20,7 +20,7 @@ */ #include <proton/connection.h> -#include <proton/connection_engine.h> +#include <proton/connection_driver.h> #include <proton/delivery.h> #include <proton/proactor.h> #include <proton/link.h> @@ -37,13 +37,14 @@ typedef char str[1024]; typedef struct app_data_t { - str address; - str container_id; - pn_rwbytes_t message_buffer; - int message_count; - int sent; - int acknowledged; - pn_proactor_t *proactor; + str address; + str container_id; + pn_rwbytes_t message_buffer; + int message_count; + int sent; + int acknowledged; + pn_proactor_t *proactor; + bool finished; } app_data_t; int exit_code = 0; @@ -58,41 +59,39 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) { /* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ static pn_bytes_t encode_message(app_data_t* app) { - /* Construct a message with the map { "sequence": app.sent } */ - pn_message_t* message = pn_message(); - pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ - pn_data_t* body = pn_message_body(message); - pn_data_put_map(body); - pn_data_enter(body); - pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); - pn_data_put_int(body, app->sent); /* The sequence number */ - pn_data_exit(body); - - /* encode the message, expanding the encode buffer as needed */ - if (app->message_buffer.start == NULL) { - static const size_t initial_size = 128; - app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); - } - /* app->message_buffer is the total buffer space available. */ - /* mbuf wil point at just the portion used by the encoded message */ - pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); - int status = 0; - while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { - app->message_buffer.size *= 2; - app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); - mbuf.size = app->message_buffer.size; - } - if (status != 0) { - fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); - exit(1); - } - pn_message_free(message); - return pn_bytes(mbuf.size, mbuf.start); + /* Construct a message with the map { "sequence": app.sent } */ + pn_message_t* message = pn_message(); + pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ + pn_data_t* body = pn_message_body(message); + pn_data_put_map(body); + pn_data_enter(body); + pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); + pn_data_put_int(body, app->sent); /* The sequence number */ + pn_data_exit(body); + + /* encode the message, expanding the encode buffer as needed */ + if (app->message_buffer.start == NULL) { + static const size_t initial_size = 128; + app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); + } + /* app->message_buffer is the total buffer space available. */ + /* mbuf wil point at just the portion used by the encoded message */ + pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); + int status = 0; + while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { + app->message_buffer.size *= 2; + app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); + mbuf.size = app->message_buffer.size; + } + if (status != 0) { + fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); + exit(1); + } + pn_message_free(message); + return pn_bytes(mbuf.size, mbuf.start); } -/* Handle event, return true of we should handle more */ -static bool handle(app_data_t* app, pn_event_t* event) { - bool more = true; +static void handle(app_data_t* app, pn_event_t* event) { switch (pn_event_type(event)) { case PN_CONNECTION_INIT: { @@ -130,7 +129,7 @@ static bool handle(app_data_t* app, pn_event_t* event) { } } break; - case PN_TRANSPORT_ERROR: + case PN_TRANSPORT_CLOSED: check_condition(event, pn_transport_condition(pn_event_transport(event))); break; @@ -151,53 +150,58 @@ static bool handle(app_data_t* app, pn_event_t* event) { break; case PN_PROACTOR_INACTIVE: - more = false; + app->finished = true; break; default: break; } - pn_event_done(event); - return more; } static void usage(const char *arg0) { - fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0); - exit(1); + fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0); + exit(1); } int main(int argc, char **argv) { - /* Default values for application and connection. */ - app_data_t app = {{0}}; - app.message_count = 100; - const char* urlstr = NULL; - - int opt; - while((opt = getopt(argc, argv, "a:m:")) != -1) { - switch(opt) { - case 'a': urlstr = optarg; break; - case 'm': app.message_count = atoi(optarg); break; - default: usage(argv[0]); break; - } + /* Default values for application and connection. */ + app_data_t app = {{0}}; + app.message_count = 100; + const char* urlstr = NULL; + + int opt; + while((opt = getopt(argc, argv, "a:m:")) != -1) { + switch(opt) { + case 'a': urlstr = optarg; break; + case 'm': app.message_count = atoi(optarg); break; + default: usage(argv[0]); break; } - if (optind < argc) - usage(argv[0]); - - snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid()); - - /* Parse the URL or use default values */ - pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL; - const char *host = url ? pn_url_get_host(url) : NULL; - const char *port = url ? pn_url_get_port(url) : "amqp"; - strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address)); - - /* Create the proactor and connect */ - app.proactor = pn_proactor(); - pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null); - if (url) pn_url_free(url); - - while (handle(&app, pn_proactor_wait(app.proactor))) - ; - pn_proactor_free(app.proactor); - free(app.message_buffer.start); - return exit_code; + } + if (optind < argc) + usage(argv[0]); + + snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid()); + + /* Parse the URL or use default values */ + pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL; + const char *host = url ? pn_url_get_host(url) : NULL; + const char *port = url ? pn_url_get_port(url) : "amqp"; + strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address)); + + /* Create the proactor and connect */ + app.proactor = pn_proactor(); + pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null); + if (url) pn_url_free(url); + + do { + pn_event_batch_t *events = pn_proactor_wait(app.proactor); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + handle(&app, e); + } + pn_proactor_done(app.proactor, events); + } while(!app.finished); + + pn_proactor_free(app.proactor); + free(app.message_buffer.start); + return exit_code; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/cpp/README.dox ---------------------------------------------------------------------- diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox index 1d46ec8..447d3ad 100644 --- a/examples/cpp/README.dox +++ b/examples/cpp/README.dox @@ -123,7 +123,7 @@ subscribe. /** @example mt/epoll_container.cpp An example implementation of the proton::container API that shows how -to use the proton::io::connection_engine SPI to adapt the proton API +to use the proton::io::connection_driver SPI to adapt the proton API to native IO, in this case using a multithreaded Linux epoll poller as the implementation. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/examples/cpp/mt/epoll_container.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp index d9b9f08..7646673 100644 --- a/examples/cpp/mt/epoll_container.cpp +++ b/examples/cpp/mt/epoll_container.cpp @@ -25,7 +25,7 @@ #include <proton/url.hpp> #include <proton/io/container_impl_base.hpp> -#include <proton/io/connection_engine.hpp> +#include <proton/io/connection_driver.hpp> #include <proton/io/link_namer.hpp> #include <atomic> @@ -97,7 +97,7 @@ class unique_fd { }; class pollable; -class pollable_engine; +class pollable_driver; class pollable_listener; class epoll_container : public proton::io::container_impl_base { @@ -124,7 +124,7 @@ class epoll_container : public proton::io::container_impl_base { std::string id() const OVERRIDE { return id_; } // Functions used internally. - proton::connection add_engine(proton::connection_options opts, int fd, bool server); + proton::connection add_driver(proton::connection_options opts, int fd, bool server); void erase(pollable*); // Link names must be unique per container. @@ -160,7 +160,7 @@ class epoll_container : public proton::io::container_impl_base { proton::connection_options options_; std::map<std::string, std::unique_ptr<pollable_listener> > listeners_; - std::map<pollable*, std::unique_ptr<pollable_engine> > engines_; + std::map<pollable*, std::unique_ptr<pollable_driver> > drivers_; std::condition_variable stopped_; bool stopping_; @@ -274,21 +274,21 @@ class epoll_event_loop : public proton::event_loop { bool closed_; }; -// Handle epoll wakeups for a connection_engine. -class pollable_engine : public pollable { +// Handle epoll wakeups for a connection_driver. +class pollable_driver : public pollable { public: - pollable_engine(epoll_container& c, int fd, int epoll_fd) : + pollable_driver(epoll_container& c, int fd, int epoll_fd) : pollable(fd, epoll_fd), loop_(new epoll_event_loop(*this)), - engine_(c, loop_) + driver_(c, loop_) { - proton::connection conn = engine_.connection(); + proton::connection conn = driver_.connection(); proton::io::set_link_namer(conn, c.link_namer); } - ~pollable_engine() { + ~pollable_driver() { loop_->close(); // No calls to notify() after this. - engine_.dispatch(); // Run any final events. + driver_.dispatch(); // Run any final events. try { write(); } catch(...) {} // Write connection close if we can. for (auto f : loop_->pop_all()) {// Run final queued work for side-effects. try { f(); } catch(...) {} @@ -303,17 +303,17 @@ class pollable_engine : public pollable { can_read = can_read && read(); for (auto f : loop_->pop_all()) // Run queued work f(); - engine_.dispatch(); + driver_.dispatch(); } while (can_read || can_write); - return (engine_.read_buffer().size ? EPOLLIN:0) | - (engine_.write_buffer().size ? EPOLLOUT:0); + return (driver_.read_buffer().size ? EPOLLIN:0) | + (driver_.write_buffer().size ? EPOLLOUT:0); } catch (const std::exception& e) { - engine_.disconnected(proton::error_condition("exception", e.what())); + driver_.disconnected(proton::error_condition("exception", e.what())); } return 0; // Ending } - proton::io::connection_engine& engine() { return engine_; } + proton::io::connection_driver& driver() { return driver_; } private: static bool try_again(int e) { @@ -322,11 +322,11 @@ class pollable_engine : public pollable { } bool write() { - proton::io::const_buffer wbuf(engine_.write_buffer()); + proton::io::const_buffer wbuf(driver_.write_buffer()); if (wbuf.size) { ssize_t n = ::write(fd_, wbuf.data, wbuf.size); if (n > 0) { - engine_.write_done(n); + driver_.write_done(n); return true; } else if (n < 0 && !try_again(errno)) { check(n, "write"); @@ -336,15 +336,15 @@ class pollable_engine : public pollable { } bool read() { - proton::io::mutable_buffer rbuf(engine_.read_buffer()); + proton::io::mutable_buffer rbuf(driver_.read_buffer()); if (rbuf.size) { ssize_t n = ::read(fd_, rbuf.data, rbuf.size); if (n > 0) { - engine_.read_done(n); + driver_.read_done(n); return true; } else if (n == 0) - engine_.read_close(); + driver_.read_close(); else if (!try_again(errno)) check(n, "read"); } @@ -352,13 +352,13 @@ class pollable_engine : public pollable { } // Lifecycle note: loop_ belongs to the proton::connection, which can live - // longer than the engine if the application holds a reference to it, we - // disconnect ourselves with loop_->close() in ~connection_engine() + // longer than the driver if the application holds a reference to it, we + // disconnect ourselves with loop_->close() in ~connection_driver() epoll_event_loop* loop_; - proton::io::connection_engine engine_; + proton::io::connection_driver driver_; }; -// A pollable listener fd that creates pollable_engine for incoming connections. +// A pollable listener fd that creates pollable_driver for incoming connections. class pollable_listener : public pollable { public: pollable_listener( @@ -380,7 +380,7 @@ class pollable_listener : public pollable { } try { int accepted = check(::accept(fd_, NULL, 0), "accept"); - container_.add_engine(listener_.on_accept(), accepted, true); + container_.add_driver(listener_.on_accept(), accepted, true); return EPOLLIN; } catch (const std::exception& e) { listener_.on_error(e.what()); @@ -424,25 +424,25 @@ epoll_container::~epoll_container() { } catch (...) {} } -proton::connection epoll_container::add_engine(proton::connection_options opts, int fd, bool server) +proton::connection epoll_container::add_driver(proton::connection_options opts, int fd, bool server) { lock_guard g(lock_); if (stopping_) throw proton::error("container is stopping"); - std::unique_ptr<pollable_engine> eng(new pollable_engine(*this, fd, epoll_fd_)); + std::unique_ptr<pollable_driver> eng(new pollable_driver(*this, fd, epoll_fd_)); if (server) - eng->engine().accept(opts); + eng->driver().accept(opts); else - eng->engine().connect(opts); - proton::connection c = eng->engine().connection(); + eng->driver().connect(opts); + proton::connection c = eng->driver().connection(); eng->notify(); - engines_[eng.get()] = std::move(eng); + drivers_[eng.get()] = std::move(eng); return c; } void epoll_container::erase(pollable* e) { lock_guard g(lock_); - if (!engines_.erase(e)) { + if (!drivers_.erase(e)) { pollable_listener* l = dynamic_cast<pollable_listener*>(e); if (l) listeners_.erase(l->addr()); @@ -451,7 +451,7 @@ void epoll_container::erase(pollable* e) { } void epoll_container::idle_check(const lock_guard&) { - if (stopping_ && engines_.empty() && listeners_.empty()) + if (stopping_ && drivers_.empty() && listeners_.empty()) interrupt(); } @@ -462,7 +462,7 @@ proton::returned<proton::connection> epoll_container::connect( unique_addrinfo ainfo(addr); unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg)); check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg); - return make_thread_safe(add_engine(opts, fd.release(), false)); + return make_thread_safe(add_driver(opts, fd.release(), false)); } proton::listener epoll_container::listen(const std::string& addr, proton::listen_handler& lh) { @@ -520,10 +520,10 @@ void epoll_container::stop(const proton::error_condition& err) { void epoll_container::wait() { std::unique_lock<std::mutex> l(lock_); stopped_.wait(l, [this]() { return this->threads_ == 0; } ); - for (auto& eng : engines_) - eng.second->engine().disconnected(stop_err_); + for (auto& eng : drivers_) + eng.second->driver().disconnected(stop_err_); listeners_.clear(); - engines_.clear(); + drivers_.clear(); } void epoll_container::interrupt() { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index ddab147..ffc6e10 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -369,7 +369,7 @@ set (qpid-proton-core src/core/encoder.c src/core/dispatcher.c - src/core/connection_engine.c + src/core/connection_driver.c src/core/engine.c src/core/event.c src/core/autodetect.c @@ -440,7 +440,7 @@ set (qpid-proton-include include/proton/codec.h include/proton/condition.h include/proton/connection.h - include/proton/connection_engine.h + include/proton/connection_driver.h include/proton/delivery.h include/proton/disposition.h include/proton/engine.h http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index ed969eb..6af4319 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -48,7 +48,7 @@ set(qpid-proton-cpp-source src/error_condition.cpp src/event_loop.cpp src/handler.cpp - src/io/connection_engine.cpp + src/io/connection_driver.cpp src/io/link_namer.cpp src/link.cpp src/listener.cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/docs/io.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/docs/io.md b/proton-c/bindings/cpp/docs/io.md index a892e61..230e538 100644 --- a/proton-c/bindings/cpp/docs/io.md +++ b/proton-c/bindings/cpp/docs/io.md @@ -7,16 +7,16 @@ The `proton::io` namespace contains a service provider interface (SPI) that allows you to implement the Proton API over alternative IO or threading libraries. -The `proton::io::connection_engine` class converts an AMQP-encoded +The `proton::io::connection_driver` class converts an AMQP-encoded byte stream, read from any IO source, into `proton::messaging_handler` calls. It generates an AMQP-encoded byte stream as output that can be written to any IO destination. -The connection engine is deliberately very simple and low level. It +The connection driver is deliberately very simple and low level. It performs no IO of its own, no thread-related locking, and is written in simple C++98-compatible code. -The connection engine can be used standalone as an AMQP translator, or +The connection dirver can be used standalone as an AMQP translator, or you can implement the following two interfaces to provide a complete implementation of the Proton API that can run any Proton application: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/docs/main.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/docs/main.md b/proton-c/bindings/cpp/docs/main.md index 011df29..93ba2c0 100644 --- a/proton-c/bindings/cpp/docs/main.md +++ b/proton-c/bindings/cpp/docs/main.md @@ -123,6 +123,6 @@ The default container implementation is created using `proton::default_container`. You can implement your own container to integrate proton with any IO -provider using the `proton::io::connection_engine`. +provider using the `proton::io::connection_driver`. @see @ref io_page http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/connection_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp index 9fbdbdc..d2deebf 100644 --- a/proton-c/bindings/cpp/include/proton/connection_options.hpp +++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp @@ -40,7 +40,7 @@ class proton_handler; class connection; namespace io { -class connection_engine; +class connection_driver; } /// Options for creating a connection. @@ -163,7 +163,7 @@ class connection_options { /// @cond INTERNAL friend class container_impl; friend class connector; - friend class io::connection_engine; + friend class io::connection_driver; friend class connection; /// @endcond }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp new file mode 100644 index 0000000..d5da718 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp @@ -0,0 +1,211 @@ +#ifndef PROTON_IO_CONNECTION_DRIVER_HPP +#define PROTON_IO_CONNECTION_DRIVER_HPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "../internal/config.hpp" +#include "../connection.hpp" +#include "../connection_options.hpp" +#include "../error.hpp" +#include "../error_condition.hpp" +#include "../internal/export.hpp" +#include "../internal/pn_unique_ptr.hpp" +#include "../transport.hpp" +#include "../types.hpp" + +#include <proton/connection_driver.h> + +#include <cstddef> +#include <utility> +#include <string> + +namespace proton { + +class event_loop; +class proton_handler; + +namespace io { + +/// **Experimental** - Pointer to a mutable memory region with a size. +struct mutable_buffer { + char* data; ///< Beginning of the buffered data. + size_t size; ///< Number of bytes in the buffer. + + /// Construct a buffer starting at data_ with size_ bytes. + mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {} +}; + +/// **Experimental** - Pointer to a const memory region with a size. +struct const_buffer { + const char* data; ///< Beginning of the buffered data. + size_t size; ///< Number of bytes in the buffer. + + /// Construct a buffer starting at data_ with size_ bytes. + const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {} +}; + +/// **Experimental** - An AMQP driver for a single connection. +/// +/// io::connection_driver manages a single proton::connection and dispatches +/// events to a proton::messaging_handler. It does no IO of its own, but allows you to +/// integrate AMQP protocol handling into any IO or concurrency framework. +/// +/// The application is coded the same way as for the +/// proton::container. The application implements a +/// proton::messaging_handler to respond to transport, connection, +/// session, link, and message events. With a little care, the same +/// handler classes can be used for both container and +/// connection_driver. the @ref broker.cpp example illustrates this. +/// +/// You need to write the IO code to read AMQP data to the +/// read_buffer(). The engine parses the AMQP frames. dispatch() calls +/// the appropriate functions on the applications proton::messaging_handler. You +/// write output data from the engine's write_buffer() to your IO. +/// +/// The engine is not safe for concurrent use, but you can process +/// different engines concurrently. A common pattern for +/// high-performance servers is to serialize read/write activity +/// per connection and dispatch in a fixed-size thread pool. +/// +/// The engine is designed to work with a classic reactor (e.g., +/// select, poll, epoll) or an async-request driven proactor (e.g., +/// windows completion ports, boost.asio, libuv). +/// +/// The engine never throws exceptions. +class +PN_CPP_CLASS_EXTERN connection_driver { + public: + /// An engine that is not associated with a proton::container or + /// proton::event_loop. + /// + /// Accessing the container or event_loop for this connection in + /// a proton::messaging_handler will throw a proton::error exception. + /// + PN_CPP_EXTERN connection_driver(); + + /// Create a connection driver associated with a proton::container and + /// optional event_loop. If the event_loop is not provided attempts to use + /// it will throw proton::error. + /// + /// Takes ownership of the event_loop. Note the proton::connection created + /// by this connection_driver can outlive the connection_driver itself if + /// the user pins it in memory using the proton::thread_safe<> template. + /// The event_loop is deleted when, and only when, the proton::connection is. + /// + PN_CPP_EXTERN connection_driver(proton::container&, event_loop* loop = 0); + + PN_CPP_EXTERN ~connection_driver(); + + /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler) + /// Does not apply any default options, to apply container defaults use connect() or accept() + /// instead. If server==true, configure a server connection. + void configure(const connection_options& opts=connection_options(), bool server=false); + + /// Call configure() with client options and call connection::open() + /// Options applied: container::id(), container::client_connection_options(), opts. + PN_CPP_EXTERN void connect(const connection_options& opts); + + /// Call configure() with server options. + /// Options applied: container::id(), container::server_connection_options(), opts. + /// + /// Note this does not call connection::open(). If there is a messaging_handler in the + /// composed options it will receive messaging_handler::on_connection_open() and can + /// respond with connection::open() or connection::close() + PN_CPP_EXTERN void accept(const connection_options& opts); + + /// The engine's read buffer. Read data into this buffer then call read_done() when complete. + /// Returns mutable_buffer(0, 0) if the engine cannot currently read data. + /// Calling dispatch() may open up more buffer space. + PN_CPP_EXTERN mutable_buffer read_buffer(); + + /// Indicate that the first n bytes of read_buffer() have valid data. + /// This changes the buffer, call read_buffer() to get the updated buffer. + PN_CPP_EXTERN void read_done(size_t n); + + /// Indicate that the read side of the transport is closed and no more data will be read. + /// Note that there may still be events to dispatch() or data to write. + PN_CPP_EXTERN void read_close(); + + /// The engine's write buffer. Write data from this buffer then call write_done() + /// Returns const_buffer(0, 0) if the engine has nothing to write. + /// Calling dispatch() may generate more data in the write buffer. + PN_CPP_EXTERN const_buffer write_buffer(); + + /// Indicate that the first n bytes of write_buffer() have been written successfully. + /// This changes the buffer, call write_buffer() to get the updated buffer. + PN_CPP_EXTERN void write_done(size_t n); + + /// Indicate that the write side of the transport has closed and no more data can be written. + /// Note that there may still be events to dispatch() or data to read. + PN_CPP_EXTERN void write_close(); + + /// Inform the engine that the transport been disconnected unexpectedly, + /// without completing the AMQP connection close sequence. + /// + /// This calls read_close(), write_close(), sets the transport().error() and + /// queues an `on_transport_error` event. You must call dispatch() one more + /// time to dispatch the messaging_handler::on_transport_error() call and other final + /// events. + /// + /// Note this does not close the connection() so that a proton::messaging_handler can + /// distinguish between a connection close error sent by the remote peer and + /// a transport failure. + /// + PN_CPP_EXTERN void disconnected(const error_condition& = error_condition()); + + /// Dispatch all available events and call the corresponding \ref messaging_handler methods. + /// + /// Returns true if the engine is still active, false if it is finished and + /// can be destroyed. The engine is finished when all events are dispatched + /// and one of the following is true: + /// + /// - both read_close() and write_close() have been called, no more IO is possible. + /// - The AMQP connection() is closed AND the write_buffer() is empty. + /// + /// May modify the read_buffer() and/or the write_buffer(). + /// + PN_CPP_EXTERN bool dispatch(); + + /// Get the AMQP connection associated with this connection_driver. + /// The event_loop is availabe via proton::thread_safe<connection>(connection()) + PN_CPP_EXTERN proton::connection connection() const; + + /// Get the transport associated with this connection_driver. + PN_CPP_EXTERN proton::transport transport() const; + + /// Get the container associated with this connection_driver, if there is one. + PN_CPP_EXTERN proton::container* container() const; + + private: + void init(); + connection_driver(const connection_driver&); + connection_driver& operator=(const connection_driver&); + + messaging_handler* handler_; + proton::container* container_; + pn_connection_driver_t driver_; +}; + +} // io +} // proton + +#endif // PROTON_IO_CONNECTION_DRIVER_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp deleted file mode 100644 index d9825c2..0000000 --- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp +++ /dev/null @@ -1,215 +0,0 @@ -#ifndef PROTON_IO_CONNECTION_ENGINE_HPP -#define PROTON_IO_CONNECTION_ENGINE_HPP - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "../internal/config.hpp" -#include "../connection.hpp" -#include "../connection_options.hpp" -#include "../error.hpp" -#include "../error_condition.hpp" -#include "../internal/export.hpp" -#include "../internal/pn_unique_ptr.hpp" -#include "../transport.hpp" -#include "../types.hpp" - -#include <proton/connection_engine.h> - -#include <cstddef> -#include <utility> -#include <string> - -namespace proton { - -class event_loop; -class proton_handler; - -namespace io { - -/// **Experimental** - Pointer to a mutable memory region with a size. -struct mutable_buffer { - char* data; ///< Beginning of the buffered data. - size_t size; ///< Number of bytes in the buffer. - - /// Construct a buffer starting at data_ with size_ bytes. - mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {} -}; - -/// **Experimental** - Pointer to a const memory region with a size. -struct const_buffer { - const char* data; ///< Beginning of the buffered data. - size_t size; ///< Number of bytes in the buffer. - - /// Construct a buffer starting at data_ with size_ bytes. - const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {} -}; - -/// **Experimental** - An AMQP protocol engine for a single -/// connection. -/// -/// A connection_engine is a protocol engine that integrates AMQP into -/// any IO or concurrency framework. -/// -/// io::connection_engine manages a single proton::connection and dispatches -/// events to a proton::messaging_handler. It does no IO of its own, but allows you to -/// integrate AMQP protocol handling into any IO or concurrency framework. -/// -/// The application is coded the same way as for the -/// proton::container. The application implements a -/// proton::messaging_handler to respond to transport, connection, -/// session, link, and message events. With a little care, the same -/// handler classes can be used for both container and -/// connection_engine. the @ref broker.cpp example illustrates this. -/// -/// You need to write the IO code to read AMQP data to the -/// read_buffer(). The engine parses the AMQP frames. dispatch() calls -/// the appropriate functions on the applications proton::messaging_handler. You -/// write output data from the engine's write_buffer() to your IO. -/// -/// The engine is not safe for concurrent use, but you can process -/// different engines concurrently. A common pattern for -/// high-performance servers is to serialize read/write activity -/// per connection and dispatch in a fixed-size thread pool. -/// -/// The engine is designed to work with a classic reactor (e.g., -/// select, poll, epoll) or an async-request driven proactor (e.g., -/// windows completion ports, boost.asio, libuv). -/// -/// The engine never throws exceptions. -class -PN_CPP_CLASS_EXTERN connection_engine { - public: - /// An engine that is not associated with a proton::container or - /// proton::event_loop. - /// - /// Accessing the container or event_loop for this connection in - /// a proton::messaging_handler will throw a proton::error exception. - /// - PN_CPP_EXTERN connection_engine(); - - /// Create a connection engine associated with a proton::container and - /// optional event_loop. If the event_loop is not provided attempts to use - /// it will throw proton::error. - /// - /// Takes ownership of the event_loop. Note the proton::connection created - /// by this connection_engine can outlive the connection_engine itself if - /// the user pins it in memory using the proton::thread_safe<> template. - /// The event_loop is deleted when, and only when, the proton::connection is. - /// - PN_CPP_EXTERN connection_engine(proton::container&, event_loop* loop = 0); - - PN_CPP_EXTERN ~connection_engine(); - - /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler) - /// Does not apply any default options, to apply container defaults use connect() or accept() - /// instead. If server==true, configure a server connection. - void configure(const connection_options& opts=connection_options(), bool server=false); - - /// Call configure() with client options and call connection::open() - /// Options applied: container::id(), container::client_connection_options(), opts. - PN_CPP_EXTERN void connect(const connection_options& opts); - - /// Call configure() with server options. - /// Options applied: container::id(), container::server_connection_options(), opts. - /// - /// Note this does not call connection::open(). If there is a messaging_handler in the - /// composed options it will receive messaging_handler::on_connection_open() and can - /// respond with connection::open() or connection::close() - PN_CPP_EXTERN void accept(const connection_options& opts); - - /// The engine's read buffer. Read data into this buffer then call read_done() when complete. - /// Returns mutable_buffer(0, 0) if the engine cannot currently read data. - /// Calling dispatch() may open up more buffer space. - PN_CPP_EXTERN mutable_buffer read_buffer(); - - /// Indicate that the first n bytes of read_buffer() have valid data. - /// This changes the buffer, call read_buffer() to get the updated buffer. - PN_CPP_EXTERN void read_done(size_t n); - - /// Indicate that the read side of the transport is closed and no more data will be read. - /// Note that there may still be events to dispatch() or data to write. - PN_CPP_EXTERN void read_close(); - - /// The engine's write buffer. Write data from this buffer then call write_done() - /// Returns const_buffer(0, 0) if the engine has nothing to write. - /// Calling dispatch() may generate more data in the write buffer. - PN_CPP_EXTERN const_buffer write_buffer(); - - /// Indicate that the first n bytes of write_buffer() have been written successfully. - /// This changes the buffer, call write_buffer() to get the updated buffer. - PN_CPP_EXTERN void write_done(size_t n); - - /// Indicate that the write side of the transport has closed and no more data can be written. - /// Note that there may still be events to dispatch() or data to read. - PN_CPP_EXTERN void write_close(); - - /// Inform the engine that the transport been disconnected unexpectedly, - /// without completing the AMQP connection close sequence. - /// - /// This calls read_close(), write_close(), sets the transport().error() and - /// queues an `on_transport_error` event. You must call dispatch() one more - /// time to dispatch the messaging_handler::on_transport_error() call and other final - /// events. - /// - /// Note this does not close the connection() so that a proton::messaging_handler can - /// distinguish between a connection close error sent by the remote peer and - /// a transport failure. - /// - PN_CPP_EXTERN void disconnected(const error_condition& = error_condition()); - - /// Dispatch all available events and call the corresponding \ref messaging_handler methods. - /// - /// Returns true if the engine is still active, false if it is finished and - /// can be destroyed. The engine is finished when all events are dispatched - /// and one of the following is true: - /// - /// - both read_close() and write_close() have been called, no more IO is possible. - /// - The AMQP connection() is closed AND the write_buffer() is empty. - /// - /// May modify the read_buffer() and/or the write_buffer(). - /// - PN_CPP_EXTERN bool dispatch(); - - /// Get the AMQP connection associated with this connection_engine. - /// The event_loop is availabe via proton::thread_safe<connection>(connection()) - PN_CPP_EXTERN proton::connection connection() const; - - /// Get the transport associated with this connection_engine. - PN_CPP_EXTERN proton::transport transport() const; - - /// Get the container associated with this connection_engine, if there is one. - PN_CPP_EXTERN proton::container* container() const; - - private: - void init(); - connection_engine(const connection_engine&); - connection_engine& operator=(const connection_engine&); - - messaging_handler* handler_; - proton::container* container_; - pn_connection_engine_t engine_; -}; - -} // io -} // proton - -#endif // PROTON_IO_CONNECTION_ENGINE_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/messaging_handler.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp index 2c5423f..acdcd30 100644 --- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp +++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp @@ -40,7 +40,7 @@ class message; class messaging_adapter; namespace io { -class connection_engine; +class connection_driver; } /// A handler for Proton messaging events. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/include/proton/transport.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp index bcd8a2f..10641e0 100644 --- a/proton-c/bindings/cpp/include/proton/transport.hpp +++ b/proton-c/bindings/cpp/include/proton/transport.hpp @@ -35,7 +35,7 @@ class error_condition; class sasl; namespace io { -class connection_engine; +class connection_driver; } /// A network channel supporting an AMQP connection. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/engine_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp index 6c3341f..991836d 100644 --- a/proton-c/bindings/cpp/src/engine_test.cpp +++ b/proton-c/bindings/cpp/src/engine_test.cpp @@ -24,7 +24,7 @@ #include "proton/container.hpp" #include "proton/uuid.hpp" -#include "proton/io/connection_engine.hpp" +#include "proton/io/connection_driver.hpp" #include "proton/io/link_namer.hpp" #include "proton/messaging_handler.hpp" #include "proton/types_fwd.hpp" @@ -37,7 +37,7 @@ namespace { using namespace std; using namespace proton; -using proton::io::connection_engine; +using proton::io::connection_driver; using proton::io::const_buffer; using proton::io::mutable_buffer; @@ -45,14 +45,14 @@ using test::dummy_container; typedef std::deque<char> byte_stream; -/// In memory connection_engine that reads and writes from byte_streams -struct in_memory_engine : public connection_engine { +/// In memory connection_driver that reads and writes from byte_streams +struct in_memory_engine : public connection_driver { byte_stream& reads; byte_stream& writes; in_memory_engine(byte_stream& rd, byte_stream& wr, class container& cont) : - connection_engine(cont), reads(rd), writes(wr) {} + connection_driver(cont), reads(rd), writes(wr) {} void do_read() { mutable_buffer rbuf = read_buffer(); @@ -247,7 +247,7 @@ void test_engine_disconnected() { void test_no_container() { // An engine with no container should throw, not crash. - connection_engine e; + connection_driver e; try { e.connection().container(); FAIL("expected error"); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/include/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp index 74a763c..1d4194e 100644 --- a/proton-c/bindings/cpp/src/include/contexts.hpp +++ b/proton-c/bindings/cpp/src/include/contexts.hpp @@ -24,7 +24,7 @@ #include "proton/connection.hpp" #include "proton/container.hpp" -#include "proton/io/connection_engine.hpp" +#include "proton/io/connection_driver.hpp" #include "proton/event_loop.hpp" #include "proton/listen_handler.hpp" #include "proton/message.hpp" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/25706a47/proton-c/bindings/cpp/src/io/connection_driver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp new file mode 100644 index 0000000..06b01d8 --- /dev/null +++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proton/io/connection_driver.hpp" + +#include "proton/event_loop.hpp" +#include "proton/error.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/uuid.hpp" + +#include "contexts.hpp" +#include "messaging_adapter.hpp" +#include "msg.hpp" +#include "proton_bits.hpp" +#include "proton_event.hpp" + +#include <proton/connection.h> +#include <proton/transport.h> +#include <proton/event.h> + +#include <algorithm> + + +namespace proton { +namespace io { + +void connection_driver::init() { + if (pn_connection_driver_init(&driver_, pn_connection(), pn_transport()) != 0) { + this->~connection_driver(); // Dtor won't be called on throw from ctor. + throw proton::error(std::string("connection_driver allocation failed")); + } +} + +connection_driver::connection_driver() : handler_(0), container_(0) { init(); } + +connection_driver::connection_driver(class container& cont, event_loop* loop) : handler_(0), container_(&cont) { + init(); + connection_context& ctx = connection_context::get(connection()); + ctx.container = container_; + ctx.event_loop.reset(loop); +} + +connection_driver::~connection_driver() { + pn_connection_driver_destroy(&driver_); +} + +// FIXME aconway 2016-11-16: rename _engine > _driver +void connection_driver::configure(const connection_options& opts, bool server) { + proton::connection c(connection()); + opts.apply_unbound(c); + if (server) pn_transport_set_server(driver_.transport); + pn_connection_driver_bind(&driver_); + opts.apply_bound(c); + handler_ = opts.handler(); + connection_context::get(connection()).collector = + pn_connection_collector(driver_.connection); +} + +void connection_driver::connect(const connection_options& opts) { + connection_options all; + if (container_) { + all.container_id(container_->id()); + all.update(container_->client_connection_options()); + } + all.update(opts); + configure(all, false); + connection().open(); +} + +void connection_driver::accept(const connection_options& opts) { + connection_options all; + if (container_) { + all.container_id(container_->id()); + all.update(container_->server_connection_options()); + } + all.update(opts); + configure(all, true); +} + +bool connection_driver::dispatch() { + pn_event_t* c_event; + while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) { + proton_event cpp_event(c_event, container_); + try { + if (handler_ != 0) { + messaging_adapter adapter(*handler_); + cpp_event.dispatch(adapter); + } + } catch (const std::exception& e) { + pn_condition_t *cond = pn_transport_condition(driver_.transport); + if (!pn_condition_is_set(cond)) { + pn_condition_format(cond, "exception", "%s", e.what()); + } + } + } + return !pn_connection_driver_finished(&driver_); +} + +mutable_buffer connection_driver::read_buffer() { + pn_rwbytes_t buffer = pn_connection_driver_read_buffer(&driver_); + return mutable_buffer(buffer.start, buffer.size); +} + +void connection_driver::read_done(size_t n) { + return pn_connection_driver_read_done(&driver_, n); +} + +void connection_driver::read_close() { + pn_connection_driver_read_close(&driver_); +} + +const_buffer connection_driver::write_buffer() { + pn_bytes_t buffer = pn_connection_driver_write_buffer(&driver_); + return const_buffer(buffer.start, buffer.size); +} + +void connection_driver::write_done(size_t n) { + return pn_connection_driver_write_done(&driver_, n); +} + +void connection_driver::write_close() { + pn_connection_driver_write_close(&driver_); +} + +void connection_driver::disconnected(const proton::error_condition& err) { + pn_condition_t* condition = pn_transport_condition(driver_.transport); + if (!pn_condition_is_set(condition)) { + set_error_condition(err, condition); + } + pn_connection_driver_close(&driver_); +} + +proton::connection connection_driver::connection() const { + return make_wrapper(driver_.connection); +} + +proton::transport connection_driver::transport() const { + return make_wrapper(driver_.transport); +} + +proton::container* connection_driver::container() const { + return container_; +} + +}} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
