PROTON-936: make the session outgoing window a fixed value, defaulted very large but configurable if needed
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a02ad90c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a02ad90c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a02ad90c Branch: refs/heads/cjansen-cpp-client Commit: a02ad90cab9af446e8251157a1525e3413776934 Parents: 32b00ae Author: Robert Gemmell <rob...@apache.org> Authored: Thu Jul 9 12:20:24 2015 +0100 Committer: Robert Gemmell <rob...@apache.org> Committed: Thu Jul 9 12:26:04 2015 +0100 ---------------------------------------------------------------------- proton-c/bindings/python/proton/__init__.py | 8 ++++++++ proton-c/include/proton/session.h | 16 +++++++++++++++ proton-c/src/engine/engine-internal.h | 1 + proton-c/src/engine/engine.c | 13 ++++++++++++ proton-c/src/transport/transport.c | 11 +--------- .../org/apache/qpid/proton/engine/Session.java | 8 ++++++++ .../qpid/proton/engine/impl/SessionImpl.java | 18 +++++++++++++++++ .../qpid/proton/engine/impl/TransportImpl.java | 2 +- .../proton/engine/impl/TransportSession.java | 21 ++------------------ proton-j/src/main/resources/cengine.py | 6 ++++++ tests/python/proton_tests/engine.py | 6 ++++++ 11 files changed, 80 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index 9c75800..d5dcceb 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -2571,6 +2571,14 @@ class Session(Wrapper, Endpoint): incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) + def _get_outgoing_window(self): + return pn_session_get_outgoing_window(self._impl) + + def _set_outgoing_window(self, window): + pn_session_set_outgoing_window(self._impl, window) + + outgoing_window = property(_get_outgoing_window, _set_outgoing_window) + @property def outgoing_bytes(self): return pn_session_outgoing_bytes(self._impl) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/include/proton/session.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/session.h b/proton-c/include/proton/session.h index 5dedb99..94d2869 100644 --- a/proton-c/include/proton/session.h +++ b/proton-c/include/proton/session.h @@ -216,6 +216,22 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session); PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t capacity); /** + * Get the outgoing window for a session object. + * + * @param[in] session the session object + * @return the outgoing window for the session + */ +PN_EXTERN size_t pn_session_get_outgoing_window(pn_session_t *session); + +/** + * Set the outgoing window for a session object. + * + * @param[in] session the session object + * @param[in] window the outgoing window for the session + */ +PN_EXTERN void pn_session_set_outgoing_window(pn_session_t *session, size_t window); + +/** * Get the number of outgoing bytes currently buffered by a session. * * @param[in] session a session object http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index c03a0a3..727f50d 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -246,6 +246,7 @@ struct pn_session_t { pn_sequence_t outgoing_bytes; pn_sequence_t incoming_deliveries; pn_sequence_t outgoing_deliveries; + pn_sequence_t outgoing_window; pn_session_state_t state; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/engine/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c index fda719a..ffbdf95 100644 --- a/proton-c/src/engine/engine.c +++ b/proton-c/src/engine/engine.c @@ -991,6 +991,7 @@ pn_session_t *pn_session(pn_connection_t *conn) ssn->outgoing_bytes = 0; ssn->incoming_deliveries = 0; ssn->outgoing_deliveries = 0; + ssn->outgoing_window = 2147483647; // begin transport state memset(&ssn->state, 0, sizeof(ssn->state)); @@ -1043,6 +1044,18 @@ void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity) ssn->incoming_capacity = capacity; } +size_t pn_session_get_outgoing_window(pn_session_t *ssn) +{ + assert(ssn); + return ssn->outgoing_window; +} + +void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window) +{ + assert(ssn); + ssn->outgoing_window = window; +} + size_t pn_session_outgoing_bytes(pn_session_t *ssn) { assert(ssn); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index d2c3509..9ce01bd 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -1909,16 +1909,7 @@ static uint16_t allocate_alias(pn_hash_t *aliases, uint32_t max_index, int * val static size_t pni_session_outgoing_window(pn_session_t *ssn) { - uint32_t size = ssn->connection->transport->remote_max_frame; - if (!size) { - return ssn->outgoing_deliveries; - } else { - pn_sequence_t frames = ssn->outgoing_bytes/size; - if (ssn->outgoing_bytes % size) { - frames++; - } - return pn_max(frames, ssn->outgoing_deliveries); - } + return ssn->outgoing_window; } static size_t pni_session_incoming_window(pn_session_t *ssn) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java index f2f048a..2179dda 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java @@ -52,4 +52,12 @@ public interface Session extends Endpoint public int getOutgoingBytes(); + public long getOutgoingWindow(); + + /** + * Sets the outgoing window size. + * + * @param outgoingWindowSize the outgoing window size + */ + public void setOutgoingWindow(long outgoingWindowSize); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java index 3af1820..9e108e4 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java @@ -44,6 +44,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession private int _outgoingBytes = 0; private int _incomingDeliveries = 0; private int _outgoingDeliveries = 0; + private long _outgoingWindow = Integer.MAX_VALUE; private LinkNode<SessionImpl> _node; @@ -270,4 +271,21 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession { getConnectionImpl().put(Event.Type.SESSION_LOCAL_CLOSE, this); } + + @Override + public void setOutgoingWindow(long outgoingWindow) { + if(outgoingWindow < 0 || outgoingWindow > 0xFFFFFFFFL) + { + throw new IllegalArgumentException("Value '" + outgoingWindow + "' must be in the" + + " range [0 - 2^32-1]"); + } + + _outgoingWindow = outgoingWindow; + } + + @Override + public long getOutgoingWindow() + { + return _outgoingWindow; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 595afd6..7d285b1 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -448,7 +448,7 @@ public class TransportImpl extends EndpointImpl Flow flow = new Flow(); flow.setNextIncomingId(ssn.getNextIncomingId()); flow.setNextOutgoingId(ssn.getNextOutgoingId()); - ssn.updateWindows(); + ssn.updateIncomingWindow(); flow.setIncomingWindow(ssn.getIncomingWindowSize()); flow.setOutgoingWindow(ssn.getOutgoingWindowSize()); if (link != null) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java index 1f4a9f8..33c6cd0 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java @@ -70,6 +70,7 @@ class TransportSession { _transport = transport; _session = session; + _outgoingWindowSize = UnsignedInteger.valueOf(session.getOutgoingWindow()); } void unbind() @@ -175,32 +176,14 @@ class TransportSession return _incomingWindowSize; } - public void updateWindows() + void updateIncomingWindow() { - // incoming window int size = _transport.getMaxFrameSize(); if (size <= 0) { _incomingWindowSize = UnsignedInteger.valueOf(2147483647); // biggest legal value } else { _incomingWindowSize = UnsignedInteger.valueOf((_session.getIncomingCapacity() - _session.getIncomingBytes())/size); } - - // outgoing window - int outgoingDeliveries = _session.getOutgoingDeliveries(); - if (size <= 0) { - _outgoingWindowSize = UnsignedInteger.valueOf(outgoingDeliveries); - } else { - int outgoingBytes = _session.getOutgoingBytes(); - int frames = outgoingBytes/size; - if (outgoingBytes % size > 0) { - frames++; - } - if (frames > outgoingDeliveries) { - _outgoingWindowSize = UnsignedInteger.valueOf(frames); - } else { - _outgoingWindowSize = UnsignedInteger.valueOf(outgoingDeliveries); - } - } } public UnsignedInteger getOutgoingDeliveryId() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/resources/cengine.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py index c94d023..ee2226d 100644 --- a/proton-j/src/main/resources/cengine.py +++ b/proton-j/src/main/resources/cengine.py @@ -280,6 +280,12 @@ def pn_session_incoming_bytes(ssn): def pn_session_outgoing_bytes(ssn): return ssn.impl.getOutgoingBytes() +def pn_session_get_outgoing_window(ssn): + return ssn.impl.getOutgoingWindow() + +def pn_session_set_outgoing_window(ssn, window): + ssn.impl.setOutgoingWindow(window) + def pn_session_condition(ssn): return ssn.condition http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py index 258665d..c18683f 100644 --- a/tests/python/proton_tests/engine.py +++ b/tests/python/proton_tests/engine.py @@ -506,6 +506,12 @@ class SessionTest(Test): assert snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + def test_set_get_outgoing_window(self): + assert self.ssn.outgoing_window == 2147483647 + + self.ssn.outgoing_window = 1024 + assert self.ssn.outgoing_window == 1024 + class LinkTest(Test): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org