Repository: qpid-proton Updated Branches: refs/heads/master 175a15a87 -> e38957ae5
PROTON-842: enforce channel_max limit on session creation Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e38957ae Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e38957ae Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e38957ae Branch: refs/heads/master Commit: e38957ae5115ec023993672ca5b7d5e3df414f7e Parents: 175a15a Author: Mick Goulish <m...@redhat.com> Authored: Thu Jun 18 08:45:47 2015 -0400 Committer: Mick Goulish <m...@redhat.com> Committed: Thu Jun 18 08:45:47 2015 -0400 ---------------------------------------------------------------------- proton-c/bindings/python/proton/__init__.py | 7 +- proton-c/include/proton/cproton.i | 2 - proton-c/include/proton/transport.h | 16 +++- proton-c/src/engine/engine-internal.h | 14 ++- proton-c/src/engine/engine.c | 16 +++- proton-c/src/transport/transport.c | 111 ++++++++++++++++++++--- tests/python/proton_tests/engine.py | 44 ++++++++- 7 files changed, 191 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index 9432bd8..5860764 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -2484,7 +2484,11 @@ class Connection(Wrapper, Endpoint): """ Returns a new session on this connection. """ - return Session(pn_session(self._impl)) + ssn = pn_session(self._impl) + if ssn is None: + raise(SessionException("Session allocation failed.")) + else: + return Session(ssn) def session_head(self, mask): return Session.wrap(pn_session_head(self._impl, mask)) @@ -3987,6 +3991,7 @@ __all__ = [ "SASL", "Sender", "Session", + "SessionException", "SSL", "SSLDomain", "SSLSessionDetails", http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/include/proton/cproton.i ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i index ac2b121..b55211f 100644 --- a/proton-c/include/proton/cproton.i +++ b/proton-c/include/proton/cproton.i @@ -210,8 +210,6 @@ typedef unsigned long int uintptr_t; { require: connection != NULL; - ensure: - pn_session != NULL; } %contract pn_transport(pn_connection_t *connection) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/include/proton/transport.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h index a3ca667..483f5a9 100644 --- a/proton-c/include/proton/transport.h +++ b/proton-c/include/proton/transport.h @@ -320,6 +320,10 @@ PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ... /** * Get the maximum allowed channel for a transport. + * This will be the minimum of + * 1. limit imposed by this proton implementation + * 2. limit imposed by remote peer + * 3. limit imposed by this application, using pn_transport_set_channel_max() * * @param[in] transport a transport object * @return the maximum allowed channel @@ -327,7 +331,17 @@ PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ... PN_EXTERN uint16_t pn_transport_get_channel_max(pn_transport_t *transport); /** - * Set the maximum allowed channel for a transport. + * Set the maximum allowed channel number for a transport. + * Note that this is the maximum channel number allowed, giving a + * valid channel number range of [0..channel_max]. Therefore the + * maximum number of simultaineously active channels will be + * channel_max plus 1. + * You can call this function more than once to raise and lower + * the limit your application imposes on max channels for this + * transport. However, smaller limits may be imposed by this + * library, or by the remote peer. + * After the OPEN frame has been sent to the remote peer, + * further calls to this function will have no effect. * * @param[in] transport a transport object * @param[in] channel_max the maximum allowed channel http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index 4c72310..c03a0a3 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -178,8 +178,20 @@ struct pn_transport_t { pn_trace_t trace; - uint16_t channel_max; + /* + * The maximum channel number can be constrained in several ways: + * 1. an unchangeable limit imposed by this library code + * 2. a limit imposed by the remote peer when the connection is opened, + * which this app must honor + * 3. a limit imposed by this app, which may be raised and lowered + * until the OPEN frame is sent. + * These constraints are all summed up in channel_max, below. + */ + #define PN_IMPL_CHANNEL_MAX 32767 + uint16_t local_channel_max; uint16_t remote_channel_max; + uint16_t channel_max; + bool freed; bool open_sent; bool open_rcvd; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/engine/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c index c5228a5..936cf60 100644 --- a/proton-c/src/engine/engine.c +++ b/proton-c/src/engine/engine.c @@ -953,12 +953,26 @@ static void pn_session_finalize(void *object) pn_session_t *pn_session(pn_connection_t *conn) { assert(conn); + + + pn_transport_t * transport = pn_connection_transport(conn); + + if(transport) { + // channel_max is an index, not a count. + if(pn_hash_size(transport->local_channels) > (size_t)transport->channel_max) { + pn_transport_logf(transport, + "pn_session: too many sessions: %d channel_max is %d", + pn_hash_size(transport->local_channels), + transport->channel_max); + return (pn_session_t *) 0; + } + } + #define pn_session_free pn_object_free static const pn_class_t clazz = PN_METACLASS(pn_session); #undef pn_session_free pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t)); if (!ssn) return NULL; - pn_endpoint_init(&ssn->endpoint, SESSION, conn); pni_add_session(conn, ssn); ssn->links = pn_list(PN_WEAKREF, 0); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index 733f695..ff80e21 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -43,6 +43,33 @@ static ssize_t transport_consume(pn_transport_t *transport); // delivery buffers +/* + * Call this any time anything happens that may affect channel_max: + * i.e. when the app indicates a preference, or when we receive the + * OPEN frame from the remote peer. And call it to do the final + * calculation just before we communicate our limit to the remote + * peer by sending our OPEN frame. + */ +static void pni_calculate_channel_max(pn_transport_t *transport) { + /* + * The application cannot make the limit larger than + * what this library will allow. + */ + transport->channel_max = (PN_IMPL_CHANNEL_MAX < transport->local_channel_max) + ? PN_IMPL_CHANNEL_MAX + : transport->local_channel_max; + + /* + * The remote peer's constraint is not valid until the + * peer's open frame has been received. + */ + if(transport->open_rcvd) { + transport->channel_max = (transport->channel_max < transport->remote_channel_max) + ? transport->channel_max + : transport->remote_channel_max; + } +} + void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next) { db->deliveries = pn_hash(PN_WEAKREF, 0, 0.75); @@ -370,7 +397,23 @@ static void pn_transport_initialize(void *object) transport->remote_hostname = NULL; transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE; transport->remote_max_frame = 0; - transport->channel_max = 0; + + /* + * We set the local limit on channels to 2^15, because + * parts of the code use the topmost bit (of a short) + * as a flag. + * The peer that this transport connects to may also + * place its own limit on max channel number, and the + * application may also set a limit. + * The maximum that we use will be the minimum of all + * these constraints. + */ + // There is no constraint yet from remote peer, + // so set to max possible. + transport->remote_channel_max = 65535; + transport->local_channel_max = PN_IMPL_CHANNEL_MAX; + transport->channel_max = transport->local_channel_max; + transport->remote_channel_max = 0; transport->local_idle_timeout = 0; transport->dead_remote_deadline = 0; @@ -1098,6 +1141,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, transport->halt = true; } transport->open_rcvd = true; + pni_calculate_channel_max(transport); return 0; } @@ -1109,6 +1153,18 @@ int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next); if (err) return err; + // AMQP 1.0 section 2.7.1 - if the peer doesn't honor our channel_max -- + // express our displeasure by closing the connection with a framing error. + if (remote_channel > transport->channel_max) { + pn_do_error(transport, + "amqp:connection:framing-error", + "remote channel %d is above negotiated channel_max %d.", + remote_channel, + transport->channel_max + ); + return PN_TRANSPORT_ERROR; + } + pn_session_t *ssn; if (reply) { // XXX: what if session is NULL? @@ -1774,6 +1830,7 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp : 0; pn_connection_t *connection = (pn_connection_t *) endpoint; const char *cid = pn_string_get(connection->container); + pni_calculate_channel_max(transport); int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[SS?I?H?InnCCC]", OPEN, cid ? cid : "", pn_string_get(connection->hostname), @@ -1792,15 +1849,16 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp return 0; } -static uint16_t allocate_alias(pn_hash_t *aliases) +static uint16_t allocate_alias(pn_hash_t *aliases, uint32_t max_index, int * valid) { - for (uint32_t i = 0; i < 65536; i++) { + for (uint32_t i = 0; i <= max_index; i++) { if (!pn_hash_get(aliases, i)) { + * valid = 1; return i; } } - assert(false); + * valid = 0; return 0; } @@ -1828,14 +1886,19 @@ static size_t pni_session_incoming_window(pn_session_t *ssn) } } -static void pni_map_local_channel(pn_session_t *ssn) +static int pni_map_local_channel(pn_session_t *ssn) { pn_transport_t *transport = ssn->connection->transport; pn_session_state_t *state = &ssn->state; - uint16_t channel = allocate_alias(transport->local_channels); + int valid; + uint16_t channel = allocate_alias(transport->local_channels, transport->channel_max, & valid); + if (!valid) { + return 0; + } state->local_channel = channel; pn_hash_put(transport->local_channels, channel, ssn); pn_ep_incref(&ssn->endpoint); + return 1; } static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) @@ -1846,7 +1909,10 @@ static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpo pn_session_state_t *state = &ssn->state; if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1) { - pni_map_local_channel(ssn); + if (! pni_map_local_channel(ssn)) { + pn_transport_logf(transport, "unable to find an open available channel within limit of %d", transport->channel_max ); + return PN_ERR; + } state->incoming_window = pni_session_incoming_window(ssn); state->outgoing_window = pni_session_outgoing_window(ssn); pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?HIII]", BEGIN, @@ -1876,12 +1942,17 @@ static const char *expiry_symbol(pn_expiry_policy_t policy) return NULL; } -static void pni_map_local_handle(pn_link_t *link) { +static int pni_map_local_handle(pn_link_t *link) { pn_link_state_t *state = &link->state; pn_session_state_t *ssn_state = &link->session->state; - state->local_handle = allocate_alias(ssn_state->local_handles); + int valid; + // XXX TODO MICK: once changes are made to handle_max, change this hardcoded value to something reasonable. + state->local_handle = allocate_alias(ssn_state->local_handles, 65536, & valid); + if ( ! valid ) + return 0; pn_hash_put(ssn_state->local_handles, state->local_handle, link); pn_ep_incref(&link->endpoint); + return 1; } static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) @@ -2574,9 +2645,27 @@ uint16_t pn_transport_get_channel_max(pn_transport_t *transport) return transport->channel_max; } -void pn_transport_set_channel_max(pn_transport_t *transport, uint16_t channel_max) +void pn_transport_set_channel_max(pn_transport_t *transport, uint16_t requested_channel_max) { - transport->channel_max = channel_max; + /* + * Once the OPEN frame has been sent, we have communicated our + * wishes to the remote client and there is no way to renegotiate. + * After that point, we do not allow the application to make changes. + * Before that point, however, the app is free to either raise or + * lower our local limit. (But the app cannot raise it above the + * limit imposed by this library.) + * The channel-max value will be finalized just before the OPEN frame + * is sent. + */ + if(transport->open_sent) { + pn_transport_logf(transport, "Cannot change local channel-max after OPEN frame sent."); + } + else { + transport->local_channel_max = (requested_channel_max < PN_IMPL_CHANNEL_MAX) + ? requested_channel_max + : PN_IMPL_CHANNEL_MAX; + pni_calculate_channel_max(transport); + } } uint16_t pn_transport_remote_channel_max(pn_transport_t *transport) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py index 924b3bc..1563889 100644 --- a/tests/python/proton_tests/engine.py +++ b/tests/python/proton_tests/engine.py @@ -211,11 +211,51 @@ class ConnectionTest(Test): assert self.c2.remote_properties == p1, (self.c2.remote_properties, p1) assert self.c1.remote_properties == p2, (self.c2.remote_properties, p2) - def test_channel_max(self, value=1234): + # The proton implementation limits channel_max to 32767. + # If I set the application's limit lower than that, I should + # get my wish. If I set it higher -- not. + def test_channel_max_low(self, value=1234): self.c1.transport.channel_max = value self.c1.open() self.pump() - assert self.c2.transport.remote_channel_max == value, (self.c2.transport.remote_channel_max, value) + assert self.c1.transport.channel_max == value, (self.c1.transport.channel_max, value) + + def test_channel_max_high(self, value=33333): + self.c1.transport.channel_max = value + self.c1.open() + self.pump() + assert self.c1.transport.channel_max == 32767, (self.c1.transport.channel_max, value) + + def test_channel_max_raise_and_lower(self): + # It's OK to lower the max below 32767. + self.c1.transport.channel_max = 12345 + assert self.c1.transport.channel_max == 12345 + # But it won't let us raise the limit above 32767. + self.c1.transport.channel_max = 33333 + assert self.c1.transport.channel_max == 32767 + self.c1.open() + self.pump() + # Now it's too late to make any change, because + # we have already sent the OPEN frame. + self.c1.transport.channel_max = 666 + assert self.c1.transport.channel_max == 32767 + + + def test_channel_max_limits_sessions(self): + return + # This is an index -- so max number of channels should be 1. + self.c1.transport.channel_max = 0 + self.c1.open() + self.c2.open() + ssn_0 = self.c2.session() + assert ssn_0 != None + ssn_0.open() + self.pump() + try: + ssn_1 = self.c2.session() + assert False, "expected session exception" + except SessionException: + pass def test_cleanup(self): self.c1.open() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org