PROTON-1344: proactor timeout support
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f2c8a3a3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f2c8a3a3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f2c8a3a3 Branch: refs/heads/master Commit: f2c8a3a38439c12d52dd93aed9173280c5754c9e Parents: 25706a4 Author: Alan Conway <[email protected]> Authored: Wed Nov 16 23:59:24 2016 -0500 Committer: Alan Conway <[email protected]> Committed: Thu Nov 17 11:22:50 2016 -0500 ---------------------------------------------------------------------- examples/c/proactor/libuv_proactor.c | 36 +++++++++++++++++++- examples/c/proactor/send.c | 55 +++++++++++++++++++++++-------- examples/c/proactor/test.py | 8 +++++ 3 files changed, 84 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/libuv_proactor.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c index 35afd5c..a26c311 100644 --- a/examples/c/proactor/libuv_proactor.c +++ b/examples/c/proactor/libuv_proactor.c @@ -157,6 +157,7 @@ struct pn_proactor_t { uv_cond_t cond; uv_loop_t loop; uv_async_t async; + uv_timer_t timer; /* Owner thread: proactor collector and batch can belong to leader or a worker */ pn_collector_t *collector; @@ -168,8 +169,11 @@ struct pn_proactor_t { queue worker_q; queue leader_q; size_t interrupt; /* pending interrupts */ + pn_millis_t timeout; size_t count; /* psocket count */ bool inactive:1; + bool timeout_request:1; + bool timeout_elapsed:1; bool has_leader:1; bool batch_working:1; /* batch belongs to a worker. */ }; @@ -551,6 +555,13 @@ static void on_write(uv_write_t* write, int err) { pc->writing = 0; /* Need to send a new write request */ } +static void on_timeout(uv_timer_t *timer) { + pn_proactor_t *p = (pn_proactor_t*)timer->data; + uv_mutex_lock(&p->lock); + p->timeout_elapsed = true; + uv_mutex_unlock(&p->lock); +} + // Read buffer allocation function for uv, just returns the transports read buffer. static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) { pconnection_t *pc = (pconnection_t*)stream->data; @@ -587,6 +598,7 @@ static void leader_rewatch(psocket_t *ps) { } } +/* Set the event in the proactor's batch */ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) { pn_collector_put(p->collector, pn_proactor__class(), p, t); p->batch_working = true; @@ -604,6 +616,10 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) { --p->interrupt; return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT); } + if (p->timeout_elapsed) { + p->timeout_elapsed = false; + return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT); + } } for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) { if (ps->is_conn) { @@ -676,6 +692,14 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { /* Lead till there is work to do. */ p->has_leader = true; while (batch == NULL) { + if (p->timeout_request) { + p->timeout_request = false; + if (p->timeout) { + uv_timer_start(&p->timer, on_timeout, p->timeout, 0); + } else { + uv_timer_stop(&p->timer); + } + } for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { void (*action)(psocket_t*) = ps->action; void (*wakeup)(psocket_t*) = ps->wakeup; @@ -710,6 +734,14 @@ void pn_proactor_interrupt(pn_proactor_t *p) { uv_mutex_unlock(&p->lock); } +void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { + uv_mutex_lock(&p->lock); + p->timeout = t; + p->timeout_request = true; + uv_async_send(&p->async); /* Interrupt the UV loop */ + uv_mutex_unlock(&p->lock); +} + int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) { pconnection_t *pc = new_pconnection_t(p, false, host, port, extra); if (!pc) { @@ -765,7 +797,9 @@ pn_proactor_t *pn_proactor() { uv_loop_init(&p->loop); uv_mutex_init(&p->lock); uv_cond_init(&p->cond); - uv_async_init(&p->loop, &p->async, NULL); /* Just wake the loop */ + uv_async_init(&p->loop, &p->async, NULL); + uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */ + p->timer.data = p; return p; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/send.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c index d64ea2d..42facb0 100644 --- a/examples/c/proactor/send.c +++ b/examples/c/proactor/send.c @@ -44,6 +44,9 @@ typedef struct app_data_t { int sent; int acknowledged; pn_proactor_t *proactor; + pn_millis_t delay; + bool delaying; + pn_link_t *sender; bool finished; } app_data_t; @@ -91,6 +94,23 @@ static pn_bytes_t encode_message(app_data_t* app) { return pn_bytes(mbuf.size, mbuf.start); } +static void send(app_data_t* app) { + while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) { + ++app->sent; + // Use sent counter bytes as unique delivery tag. + pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); + pn_bytes_t msgbuf = encode_message(app); + pn_link_send(app->sender, msgbuf.start, msgbuf.size); + pn_link_advance(app->sender); + if (app->delay && app->sent < app->message_count) { + /* If delay is set, wait for TIMEOUT event to send more */ + app->delaying = true; + pn_proactor_set_timeout(app->proactor, app->delay); + break; + } + } +} + static void handle(app_data_t* app, pn_event_t* event) { switch (pn_event_type(event)) { @@ -105,18 +125,24 @@ static void handle(app_data_t* app, pn_event_t* event) { pn_link_open(l); } break; - case PN_LINK_FLOW: { - /* The peer has given us some credit, now we can send messages */ - pn_link_t *sender = pn_event_link(event); - while (pn_link_credit(sender) > 0 && app->sent < app->message_count) { - ++app->sent; - // Use sent counter bytes as unique delivery tag. - pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); - pn_bytes_t msgbuf = encode_message(app); - pn_link_send(sender, msgbuf.start, msgbuf.size); - pn_link_advance(sender); - } - } break; + case PN_LINK_FLOW: + /* The peer has given us some credit, now we can send messages */ + if (!app->delaying) { + app->sender = pn_event_link(event); + send(app); + } + break; + + case PN_PROACTOR_TIMEOUT: + /* Wake the sender's connection */ + pn_connection_wake(pn_session_connection(pn_link_session(app->sender))); + break; + + case PN_CONNECTION_WAKE: + /* Timeout, we can send more. */ + app->delaying = false; + send(app); + break; case PN_DELIVERY: { /* We received acknowledgedment from the peer that a message was delivered. */ @@ -158,7 +184,7 @@ static void handle(app_data_t* app, pn_event_t* event) { } static void usage(const char *arg0) { - fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0); + fprintf(stderr, "Usage: %s [-a url] [-m message-count] [-d delay-ms]\n", arg0); exit(1); } @@ -169,10 +195,11 @@ int main(int argc, char **argv) { const char* urlstr = NULL; int opt; - while((opt = getopt(argc, argv, "a:m:")) != -1) { + while((opt = getopt(argc, argv, "a:m:d:")) != -1) { switch(opt) { case 'a': urlstr = optarg; break; case 'm': app.message_count = atoi(optarg); break; + case 'd': app.delay = atoi(optarg); break; default: usage(argv[0]); break; } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2c8a3a3/examples/c/proactor/test.py ---------------------------------------------------------------------- diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py index 5dc3a99..a86425d 100644 --- a/examples/c/proactor/test.py +++ b/examples/c/proactor/test.py @@ -48,5 +48,13 @@ class CExampleTest(BrokerTestCase): self.assertEqual("100 messages sent and acknowledged\n", s.wait_out()) self.assertEqual(receive_expect(100), r.wait_out()) + def test_timed_send(self): + """Send with timed delay""" + s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"]) + self.assertEqual("3 messages sent and acknowledged\n", s.wait_out()) + r = self.proc(["libuv_receive", "-a", self.addr, "-m3"]) + self.assertEqual(receive_expect(3), r.wait_out()) + + if __name__ == "__main__": unittest.main() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
