PROTON-1809: PROTON-636: Unable to receive messages when max-frame-size > 2^20
Caused when the frame size was greater than the default session-capacity so the incoming windows is always 0. Fixes: 1. No default session-capacity. Session flow control is enabled only if both session-capacity and max-frame-size are set. Neither value is deduced automatically. 2. Transport error if both are set and session-capacity is less than max-frame-size. In this case the incoming window is always 0 so communication is impossible. 3. Update API doc for pn_session_set_capacity 4. Add tests to verify this behavior Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e828055b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e828055b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e828055b Branch: refs/heads/go1 Commit: e828055b2a0fade8835cd597e91d0ae3c8bb3f5a Parents: c7717a4 Author: Alan Conway <acon...@redhat.com> Authored: Tue Apr 10 10:31:56 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Tue Apr 10 13:28:24 2018 -0400 ---------------------------------------------------------------------- c/examples/receive.c | 2 +- c/include/proton/message.h | 3 ++ c/include/proton/session.h | 11 ++++--- c/src/core/engine.c | 5 ++-- c/src/core/framing.h | 1 + c/src/core/transport.c | 20 +++++++++---- c/tests/connection_driver.c | 64 +++++++++++++++++++++++++++++++++++++++- c/tests/test_handler.h | 12 ++++++++ c/tests/test_tools.h | 8 ++++- 9 files changed, 110 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/examples/receive.c ---------------------------------------------------------------------- diff --git a/c/examples/receive.c b/c/examples/receive.c index 0d0c988..8280345 100644 --- a/c/examples/receive.c +++ b/c/examples/receive.c @@ -93,7 +93,7 @@ static bool handle(app_data_t* app, pn_event_t* event) { } break; case PN_DELIVERY: { - /* A message has been received */ + /* A message (or part of a message) has been received */ pn_delivery_t *d = pn_event_delivery(event); if (pn_delivery_readable(d)) { pn_link_t *l = pn_delivery_link(d); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/include/proton/message.h ---------------------------------------------------------------------- diff --git a/c/include/proton/message.h b/c/include/proton/message.h index 0f094be..d7b9663 100644 --- a/c/include/proton/message.h +++ b/c/include/proton/message.h @@ -748,6 +748,9 @@ struct pn_link_t; * - call pn_link_send() to send the encoded message bytes * - call pn_link_advance() to indicate the message is complete * + * Note: you must create a delivery for the message before calling + * pn_message_send() see pn_delivery() + * * @param[in] msg A message object. * @param[in] sender A sending link. * The message will be encoded and sent with pn_link_send() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/include/proton/session.h ---------------------------------------------------------------------- diff --git a/c/include/proton/session.h b/c/include/proton/session.h index cabb1f2..512e004 100644 --- a/c/include/proton/session.h +++ b/c/include/proton/session.h @@ -198,9 +198,7 @@ PN_EXTERN void pn_session_close(pn_session_t *session); * Get the incoming capacity of the session measured in bytes. * * The incoming capacity of a session determines how much incoming - * message data the session will buffer. Note that if this value is - * less than the negotiated frame size of the transport, it will be - * rounded up to one full frame. + * message data the session will buffer. * * @param[in] session the session object * @return the incoming capacity of the session in bytes @@ -211,9 +209,10 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session); * Set the incoming capacity for a session object. * * The incoming capacity of a session determines how much incoming - * message data the session will buffer. Note that if this value is - * less than the negotiated frame size of the transport, it will be - * rounded up to one full frame. + * message data the session will buffer. + * + * NOTE: If set, this value must be greater than or equal to the negotiated + * frame size of the transport. * * @param[in] session the session object * @param[in] capacity the incoming capacity for the session http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/engine.c ---------------------------------------------------------------------- diff --git a/c/src/core/engine.c b/c/src/core/engine.c index f49886d..070c751 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -20,6 +20,7 @@ */ #include "engine-internal.h" +#include "framing.h" #include <stdlib.h> #include <string.h> #include "protocol.h" @@ -987,12 +988,12 @@ pn_session_t *pn_session(pn_connection_t *conn) ssn->links = pn_list(PN_WEAKREF, 0); ssn->freed = pn_list(PN_WEAKREF, 0); ssn->context = pn_record(); - ssn->incoming_capacity = 1024*1024; + ssn->incoming_capacity = 0; ssn->incoming_bytes = 0; ssn->outgoing_bytes = 0; ssn->incoming_deliveries = 0; ssn->outgoing_deliveries = 0; - ssn->outgoing_window = 2147483647; + ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE; // begin transport state memset(&ssn->state, 0, sizeof(ssn->state)); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/framing.h ---------------------------------------------------------------------- diff --git a/c/src/core/framing.h b/c/src/core/framing.h index 792d664..92c1f7d 100644 --- a/c/src/core/framing.h +++ b/c/src/core/framing.h @@ -30,6 +30,7 @@ #define AMQP_HEADER_SIZE (8) #define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame +#define AMQP_MAX_WINDOW_SIZE (2147483647) typedef struct { uint8_t type; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/transport.c ---------------------------------------------------------------------- diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 96b54f2..1a05261 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1893,11 +1893,21 @@ static size_t pni_session_outgoing_window(pn_session_t *ssn) static size_t pni_session_incoming_window(pn_session_t *ssn) { - uint32_t size = ssn->connection->transport->local_max_frame; - if (!size) { - return 2147483647; // biggest legal value - } else { - return (ssn->incoming_capacity - ssn->incoming_bytes)/size; + pn_transport_t *t = ssn->connection->transport; + uint32_t size = t->local_max_frame; + size_t capacity = ssn->incoming_capacity; + if (!size || !capacity) { /* session flow control is not enabled */ + return AMQP_MAX_WINDOW_SIZE; + } else if (capacity >= size) { /* precondition */ + return (capacity - ssn->incoming_bytes) / size; + } else { /* error: we will never have a non-zero window */ + pn_condition_format( + pn_transport_condition(t), + "amqp:internal-error", + "session capacity %"PN_ZU" is less than frame size %"PN_ZU, + capacity, size); + pn_transport_close_tail(t); + return 0; } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/connection_driver.c ---------------------------------------------------------------------- diff --git a/c/tests/connection_driver.c b/c/tests/connection_driver.c index 62655a7..5997f52 100644 --- a/c/tests/connection_driver.c +++ b/c/tests/connection_driver.c @@ -379,6 +379,68 @@ static void test_message_abort_mixed(test_t *t) { test_connection_driver_destroy(&server); } +/* Set capacity and max frame, send a single message */ +static void set_capacity_and_max_frame( + size_t capacity, size_t max_frame, + test_connection_driver_t *client, test_connection_driver_t *server, + const char* data) +{ + pn_transport_set_max_frame(client->driver.transport, max_frame); + pn_connection_open(client->driver.connection); + pn_session_t *ssn = pn_session(client->driver.connection); + pn_session_set_incoming_capacity(ssn, capacity); + pn_session_open(ssn); + pn_link_t *snd = pn_sender(ssn, "x"); + pn_link_open(snd); + test_connection_drivers_run(client, server); + pn_link_flow(server->handler.link, 1); + test_connection_drivers_run(client, server); + if (pn_transport_closed(client->driver.transport) || + pn_transport_closed(server->driver.transport)) + return; + /* Send a message */ + pn_message_t *m = pn_message(); + pn_message_set_address(m, data); + pn_delivery(snd, PN_BYTES_LITERAL(x)); + pn_message_send(m, snd, NULL); + pn_message_free(m); + test_connection_drivers_run(client, server); +} + +#define MAX_WINDOW (2147483647) +#define MAX_FRAME (4294967295) +/* Test different settings for max-frame, outgoing-window, incoming-capacity */ +static void test_session_flow_control(test_t *t) { + test_connection_driver_t client, server; + test_connection_drivers_init(t, &client, open_handler, &server, delivery_handler); + pn_message_t *m = pn_message(); + pn_rwbytes_t buf= {0}; + + /* Capacity equal to frame size OK */ + set_capacity_and_max_frame(1234, 1234, &client, &server, "foo"); + pn_delivery_t *dlv = server.handler.delivery; + if (TEST_CHECK(t, dlv)) { + message_decode(m, dlv, &buf); + TEST_STR_EQUAL(t, "foo", pn_message_get_address(m)); + } + + /* Capacity bigger than frame size OK */ + set_capacity_and_max_frame(12345, 1234, &client, &server, "foo"); + dlv = server.handler.delivery; + if (TEST_CHECK(t, dlv)) { + message_decode(m, dlv, &buf); + TEST_STR_EQUAL(t, "foo", pn_message_get_address(m)); + } + + /* Capacity smaller than frame size is an error */ + set_capacity_and_max_frame(1234, 12345, &client, &server, "foo"); + TEST_COND_NAME(t, "amqp:internal-error", pn_transport_condition(client.driver.transport)); + TEST_COND_DESC(t, "session capacity 1234 is less than frame size 12345", pn_transport_condition(client.driver.transport)); + + pn_message_free(m); + free(buf.start); + test_connection_drivers_destroy(&client, &server); +} int main(int argc, char **argv) { int failed = 0; @@ -386,6 +448,6 @@ int main(int argc, char **argv) { RUN_ARGV_TEST(failed, t, test_message_stream(&t)); RUN_ARGV_TEST(failed, t, test_message_abort(&t)); RUN_ARGV_TEST(failed, t, test_message_abort_mixed(&t)); + RUN_ARGV_TEST(failed, t, test_session_flow_control(&t)); return failed; - } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/test_handler.h ---------------------------------------------------------------------- diff --git a/c/tests/test_handler.h b/c/tests/test_handler.h index 108d0d9..7ffd4d3 100644 --- a/c/tests/test_handler.h +++ b/c/tests/test_handler.h @@ -46,6 +46,7 @@ typedef struct test_handler_t { pn_link_t *sender; pn_link_t *receiver; pn_delivery_t *delivery; + pn_message_t *message; pn_ssl_domain_t *ssl_domain; } test_handler_t; @@ -167,4 +168,15 @@ test_connection_driver_t* test_connection_drivers_run(test_connection_driver_t * return NULL; } +/* Initialize a client-server driver pair */ +void test_connection_drivers_init(test_t *t, test_connection_driver_t *a, test_handler_fn fa, test_connection_driver_t *b, test_handler_fn fb) { + test_connection_driver_init(a, t, fa, NULL); + test_connection_driver_init(b, t, fb, NULL); + pn_transport_set_server(b->driver.transport); +} + +void test_connection_drivers_destroy(test_connection_driver_t *a, test_connection_driver_t *b) { + test_connection_driver_destroy(a); + test_connection_driver_destroy(b); +} #endif // TESTS_TEST_DRIVER_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/test_tools.h ---------------------------------------------------------------------- diff --git a/c/tests/test_tools.h b/c/tests/test_tools.h index d046a43..7596d60 100644 --- a/c/tests/test_tools.h +++ b/c/tests/test_tools.h @@ -148,6 +148,11 @@ bool test_int_equal_(test_t *t, int want, int got, const char *file, int line) { } #define TEST_INT_EQUAL(TEST, WANT, GOT) test_int_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__) +bool test_size_equal_(test_t *t, size_t want, size_t got, const char *file, int line) { + return test_check_(t, want == got, NULL, file, line, "want %zd, got %zd", want, got); +} +#define TEST_SIZE_EQUAL(TEST, WANT, GOT) test_size_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__) + bool test_str_equal_(test_t *t, const char* want, const char* got, const char *file, int line) { return test_check_(t, !strcmp(want, got), NULL, file, line, "want '%s', got '%s'", want, got); } @@ -231,7 +236,8 @@ void message_decode(pn_message_t *m, pn_delivery_t *d, pn_rwbytes_t *buf) { pn_link_t *l = pn_delivery_link(d); ssize_t size = pn_delivery_pending(d); rwbytes_ensure(buf, size); - TEST_ASSERT(size == pn_link_recv(l, buf->start, size)); + ssize_t result = pn_link_recv(l, buf->start, size); + TEST_ASSERTF(size == result, "%ld != %ld", (long)size, (long)result); pn_message_clear(m); TEST_ASSERTF(!pn_message_decode(m, buf->start, size), "decode: %s", pn_error_text(pn_message_error(m))); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org