Repository: qpid-proton Updated Branches: refs/heads/master 99df0a33d -> fe31dcea7
added pn_event_reactor Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fe31dcea Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fe31dcea Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fe31dcea Branch: refs/heads/master Commit: fe31dcea7033eea2faf2700d287ae9f94d78abda Parents: 99df0a3 Author: Rafael Schloming <r...@alum.mit.edu> Authored: Tue Jan 13 18:32:29 2015 -0500 Committer: Rafael Schloming <r...@alum.mit.edu> Committed: Tue Jan 13 18:32:29 2015 -0500 ---------------------------------------------------------------------- proton-c/include/proton/reactor.h | 2 + proton-c/src/reactor/acceptor.c | 1 + proton-c/src/reactor/reactor.c | 77 ++++++++++++++++++++++++++++++++-- proton-c/src/reactor/reactor.h | 2 + proton-c/src/tests/reactor.c | 27 +++++++----- 5 files changed, 94 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/include/proton/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h index e5d953f..1b21195 100644 --- a/proton-c/include/proton/reactor.h +++ b/proton-c/include/proton/reactor.h @@ -85,6 +85,8 @@ PN_EXTERN int pn_timer_tasks(pn_timer_t *timer); PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task); +PN_EXTERN pn_reactor_t *pn_event_reactor(pn_event_t *event); + /** @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/acceptor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c index 889016c..34446a2 100644 --- a/proton-c/src/reactor/acceptor.c +++ b/proton-c/src/reactor/acceptor.c @@ -63,6 +63,7 @@ pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, cons pn_socket_t socket = pn_listen(pn_reactor_io(reactor), host, port); pni_selectable_set_fd(sel, socket); pni_selectable_set_context(sel, reactor); + pni_record_init_reactor(pn_selectable_attachments(sel), reactor); pni_record_init_handler(pn_selectable_attachments(sel), handler); pn_reactor_update(reactor, sel); return (pn_acceptor_t *) sel; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c index 3857dd7..82b903d 100644 --- a/proton-c/src/reactor/reactor.c +++ b/proton-c/src/reactor/reactor.c @@ -23,15 +23,16 @@ #include <proton/io.h> #include <proton/selector.h> #include <proton/event.h> -#include <proton/reactor.h> #include <proton/transport.h> #include <proton/connection.h> #include <proton/session.h> #include <proton/link.h> +#include <proton/delivery.h> #include <stdio.h> #include <stdlib.h> #include <assert.h> +#include "reactor.h" #include "selectable.h" #include "platform.h" @@ -173,8 +174,21 @@ void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event); void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event); void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event); -static void pni_reactor_dispatch(pn_reactor_t *reactor, pn_event_t *event) { +static void pni_reactor_dispatch_pre(pn_reactor_t *reactor, pn_event_t *event) { assert(reactor); + assert(event); + switch (pn_event_type(event)) { + case PN_CONNECTION_INIT: + pni_record_init_reactor(pn_connection_attachments(pn_event_connection(event)), reactor); + break; + default: + break; + } +} + +static void pni_reactor_dispatch_post(pn_reactor_t *reactor, pn_event_t *event) { + assert(reactor); + assert(event); switch (pn_event_type(event)) { case PN_TRANSPORT: pni_handle_transport(reactor, event); @@ -202,6 +216,58 @@ void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler) { pn_record_set(record, PN_HANDLER, handler); } +static void *pni_reactor = NULL; +#define PN_REACTOR ((pn_handle_t) &pni_reactor) + +pn_reactor_t *pni_record_get_reactor(pn_record_t *record) { + return (pn_reactor_t *) pn_record_get(record, PN_REACTOR); +} + +void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor) { + pn_record_def(record, PN_REACTOR, PN_WEAKREF); + pn_record_set(record, PN_REACTOR, reactor); +} + +static pn_connection_t *pni_object_connection(const pn_class_t *clazz, void *object) { + switch (pn_class_id(clazz)) { + case CID_pn_delivery: + return pn_session_connection(pn_link_session(pn_delivery_link((pn_delivery_t *) object))); + case CID_pn_link: + return pn_session_connection(pn_link_session((pn_link_t *) object)); + case CID_pn_session: + return pn_session_connection((pn_session_t *) object); + case CID_pn_connection: + return (pn_connection_t *) object; + case CID_pn_transport: + return pn_transport_connection((pn_transport_t *) object); + default: + return NULL; + } +} + +pn_reactor_t *pn_event_reactor(pn_event_t *event) { + const pn_class_t *clazz = pn_event_class(event); + void *context = pn_event_context(event); + switch (pn_class_id(clazz)) { + case CID_pn_reactor: + return (pn_reactor_t *) context; + case CID_pn_task: + return pni_record_get_reactor(pn_task_attachments((pn_task_t *) context)); + case CID_pn_delivery: + case CID_pn_link: + case CID_pn_session: + case CID_pn_connection: + case CID_pn_transport: + { + pn_connection_t *conn = pni_object_connection(pn_event_class(event), context); + pn_record_t *record = pn_connection_attachments(conn); + return pni_record_get_reactor(record); + } + default: + return NULL; + } +} + pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler) { pn_handler_t *handler = NULL; pn_link_t *link = pn_event_link(event); @@ -229,7 +295,9 @@ pn_handler_t *pn_event_handler(pn_event_t *event, pn_handler_t *default_handler) pn_task_t *pn_reactor_schedule(pn_reactor_t *reactor, int delay, pn_handler_t *handler) { pn_timer_t *timer = pni_timer(reactor->timer); pn_task_t *task = pn_timer_schedule(timer, reactor->now + delay); - pni_record_init_handler(pn_task_attachments(task), handler); + pn_record_t *record = pn_task_attachments(task); + pni_record_init_reactor(record, reactor); + pni_record_init_handler(record, handler); pn_reactor_update(reactor, reactor->timer); return task; } @@ -238,9 +306,10 @@ void pn_reactor_process(pn_reactor_t *reactor) { assert(reactor); pn_event_t *event; while ((event = pn_collector_peek(reactor->collector))) { + pni_reactor_dispatch_pre(reactor, event); pn_handler_t *handler = pn_event_handler(event, reactor->handler); pn_handler_dispatch(handler, event); - pni_reactor_dispatch(reactor, event); + pni_reactor_dispatch_post(reactor, event); pn_collector_pop(reactor->collector); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/reactor/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.h b/proton-c/src/reactor/reactor.h index e21274d..398eb8b 100644 --- a/proton-c/src/reactor/reactor.h +++ b/proton-c/src/reactor/reactor.h @@ -26,5 +26,7 @@ pn_handler_t *pni_record_get_handler(pn_record_t *record); void pni_record_init_handler(pn_record_t *record, pn_handler_t *handler); +void pni_record_init_reactor(pn_record_t *record, pn_reactor_t *reactor); + #endif /* src/reactor.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fe31dcea/proton-c/src/tests/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c index 4eff533..ce8c5cf 100644 --- a/proton-c/src/tests/reactor.c +++ b/proton-c/src/tests/reactor.c @@ -46,6 +46,7 @@ static void test_reactor_run(void) { } typedef struct { + pn_reactor_t *reactor; pn_list_t *events; } pni_test_handler_t; @@ -54,11 +55,15 @@ pni_test_handler_t *thmem(pn_handler_t *handler) { } void test_dispatch(pn_handler_t *handler, pn_event_t *event) { - pn_list_add(thmem(handler)->events, (void *) pn_event_type(event)); + pni_test_handler_t *th = thmem(handler); + pn_reactor_t *reactor = pn_event_reactor(event); + assert(reactor == th->reactor); + pn_list_add(th->events, (void *) pn_event_type(event)); } -pn_handler_t *test_handler(pn_list_t *events) { +pn_handler_t *test_handler(pn_reactor_t *reactor, pn_list_t *events) { pn_handler_t *handler = pn_handler_new(test_dispatch, sizeof(pni_test_handler_t), NULL); + thmem(handler)->reactor = reactor; thmem(handler)->events = events; return handler; } @@ -89,7 +94,7 @@ static void test_reactor_handler(void) { pn_handler_t *handler = pn_reactor_handler(reactor); assert(handler); pn_list_t *events = pn_list(PN_VOID, 0); - pn_handler_t *th = test_handler(events); + pn_handler_t *th = test_handler(reactor, events); pn_handler_add(handler, th); pn_decref(th); pn_free(reactor); @@ -103,7 +108,7 @@ static void test_reactor_handler_free(void) { pn_handler_t *handler = pn_reactor_handler(reactor); assert(handler); pn_list_t *events = pn_list(PN_VOID, 0); - pn_handler_add(handler, test_handler(events)); + pn_handler_add(handler, test_handler(reactor, events)); pn_reactor_free(reactor); expect(events, END); pn_free(events); @@ -115,7 +120,7 @@ static void test_reactor_handler_run(void) { pn_handler_t *handler = pn_reactor_handler(reactor); assert(handler); pn_list_t *events = pn_list(PN_VOID, 0); - pn_handler_t *th = test_handler(events); + pn_handler_t *th = test_handler(reactor, events); pn_handler_add(handler, th); pn_reactor_run(reactor); expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END); @@ -130,7 +135,7 @@ static void test_reactor_handler_run_free(void) { pn_handler_t *handler = pn_reactor_handler(reactor); assert(handler); pn_list_t *events = pn_list(PN_VOID, 0); - pn_handler_add(handler, test_handler(events)); + pn_handler_add(handler, test_handler(reactor, events)); pn_reactor_run(reactor); expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END); pn_reactor_free(reactor); @@ -141,12 +146,12 @@ static void test_reactor_connection(void) { pn_reactor_t *reactor = pn_reactor(); assert(reactor); pn_list_t *cevents = pn_list(PN_VOID, 0); - pn_handler_t *tch = test_handler(cevents); + pn_handler_t *tch = test_handler(reactor, cevents); pn_connection_t *connection = pn_reactor_connection(reactor, tch); assert(connection); pn_handler_t *root = pn_reactor_handler(reactor); pn_list_t *revents = pn_list(PN_VOID, 0); - pn_handler_add(root, test_handler(revents)); + pn_handler_add(root, test_handler(reactor, revents)); pn_reactor_run(reactor); expect(revents, PN_REACTOR_INIT, PN_REACTOR_FINAL, END); expect(cevents, PN_CONNECTION_INIT, END); @@ -394,7 +399,7 @@ static void test_reactor_schedule(void) { pn_reactor_t *reactor = pn_reactor(); pn_handler_t *root = pn_reactor_handler(reactor); pn_list_t *events = pn_list(PN_VOID, 0); - pn_handler_add(root, test_handler(events)); + pn_handler_add(root, test_handler(reactor, events)); pn_reactor_schedule(reactor, 0, NULL); pn_reactor_run(reactor); pn_reactor_free(reactor); @@ -407,8 +412,8 @@ static void test_reactor_schedule_handler(void) { pn_handler_t *root = pn_reactor_handler(reactor); pn_list_t *events = pn_list(PN_VOID, 0); pn_list_t *tevents = pn_list(PN_VOID, 0); - pn_handler_add(root, test_handler(events)); - pn_handler_t *th = test_handler(tevents); + pn_handler_add(root, test_handler(reactor, events)); + pn_handler_t *th = test_handler(reactor, tevents); pn_reactor_schedule(reactor, 0, th); pn_reactor_run(reactor); pn_reactor_free(reactor); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org