http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py deleted file mode 100644 index c0ea31d..0000000 --- a/tests/python/proton_tests/engine.py +++ /dev/null @@ -1,2714 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from __future__ import absolute_import - -import os, gc -import sys -from . import common -from time import time, sleep -from proton import * -from .common import pump, Skipped -from proton.reactor import Reactor - - -# older versions of gc do not provide the garbage list -if not hasattr(gc, "garbage"): - gc.garbage=[] - -# future test areas -# + different permutations of setup -# - creating deliveries and calling input/output before opening the session/link -# + shrinking output_size down to something small? should the engine buffer? -# + resuming -# - locally and remotely created deliveries with the same tag - -# Jython 2.5 needs this: -try: - bytes() -except: - bytes = str - -# and this... -try: - bytearray() -except: - def bytearray(x): - return b'\x00' * x - -OUTPUT_SIZE = 10*1024 - -class Test(common.Test): - - def __init__(self, *args): - common.Test.__init__(self, *args) - self._wires = [] - - def connection(self): - c1 = Connection() - c2 = Connection() - t1 = Transport() - t1.bind(c1) - t2 = Transport() - t2.bind(c2) - self._wires.append((c1, t1, c2, t2)) - - mask1 = 0 - mask2 = 0 - - for cat in ("TRACE_FRM", "TRACE_RAW"): - trc = os.environ.get("PN_%s" % cat) - if trc and trc.lower() in ("1", "2", "yes", "true"): - mask1 = mask1 | getattr(Transport, cat) - if trc == "2": - mask2 = mask2 | getattr(Transport, cat) - t1.trace(mask1) - t2.trace(mask2) - - return c1, c2 - - def link(self, name, max_frame=None, idle_timeout=None): - c1, c2 = self.connection() - if max_frame: - c1.transport.max_frame_size = max_frame[0] - c2.transport.max_frame_size = max_frame[1] - if idle_timeout: - # idle_timeout in seconds expressed as float - c1.transport.idle_timeout = idle_timeout[0] - c2.transport.idle_timeout = idle_timeout[1] - c1.open() - c2.open() - ssn1 = c1.session() - ssn1.open() - self.pump() - ssn2 = c2.session_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE) - ssn2.open() - self.pump() - snd = ssn1.sender(name) - rcv = ssn2.receiver(name) - return snd, rcv - - def cleanup(self): - self._wires = [] - - def pump(self, buffer_size=OUTPUT_SIZE): - for c1, t1, c2, t2 in self._wires: - pump(t1, t2, buffer_size) - -class ConnectionTest(Test): - - def setUp(self): - gc.enable() - self.c1, self.c2 = self.connection() - - def cleanup(self): - # release resources created by this class - super(ConnectionTest, self).cleanup() - self.c1 = None - self.c2 = None - - def tearDown(self): - self.cleanup() - gc.collect() - assert not gc.garbage - - def test_open_close(self): - assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - - self.c1.open() - self.pump() - - assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE - - self.c2.open() - self.pump() - - assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.c1.close() - self.pump() - - assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - - self.c2.close() - self.pump() - - assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - assert self.c2.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - def test_simultaneous_open_close(self): - assert self.c1.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - assert self.c2.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - - self.c1.open() - self.c2.open() - - self.pump() - - assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.c1.close() - self.c2.close() - - self.pump() - - assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - assert self.c2.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - def test_capabilities(self): - self.c1.offered_capabilities = Array(UNDESCRIBED, Data.SYMBOL, - symbol("O_one"), - symbol("O_two"), - symbol("O_three")) - - self.c1.desired_capabilities = Array(UNDESCRIBED, Data.SYMBOL, - symbol("D_one"), - symbol("D_two"), - symbol("D_three")) - self.c1.open() - - assert self.c2.remote_offered_capabilities is None - assert self.c2.remote_desired_capabilities is None - - self.pump() - - assert self.c2.remote_offered_capabilities == self.c1.offered_capabilities, \ - (self.c2.remote_offered_capabilities, self.c1.offered_capabilities) - assert self.c2.remote_desired_capabilities == self.c1.desired_capabilities, \ - (self.c2.remote_desired_capabilities, self.c1.desired_capabilities) - - def test_condition(self): - self.c1.open() - self.c2.open() - self.pump() - assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"}) - self.c1.condition = cond - self.c1.close() - - self.pump() - - assert self.c1.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - - rcond = self.c2.remote_condition - assert rcond == cond, (rcond, cond) - - def test_properties(self, p1={symbol("key"): symbol("value")}, p2=None): - self.c1.properties = p1 - self.c2.properties = p2 - self.c1.open() - self.c2.open() - self.pump() - - assert self.c2.remote_properties == p1, (self.c2.remote_properties, p1) - assert self.c1.remote_properties == p2, (self.c2.remote_properties, p2) - - # The proton implementation limits channel_max to 32767. - # If I set the application's limit lower than that, I should - # get my wish. If I set it higher -- not. - def test_channel_max_low(self, value=1234): - self.c1.transport.channel_max = value - self.c1.open() - self.pump() - assert self.c1.transport.channel_max == value, (self.c1.transport.channel_max, value) - - def test_channel_max_high(self, value=65535): - self.c1.transport.channel_max = value - self.c1.open() - self.pump() - assert self.c1.transport.channel_max == 32767, (self.c1.transport.channel_max, value) - - def test_channel_max_raise_and_lower(self): - upper_limit = 32767 - - # It's OK to lower the max below upper_limit. - self.c1.transport.channel_max = 12345 - assert self.c1.transport.channel_max == 12345 - - # But it won't let us raise the limit above PN_IMPL_CHANNEL_MAX. - self.c1.transport.channel_max = 65535 - assert self.c1.transport.channel_max == upper_limit - - # send the OPEN frame - self.c1.open() - self.pump() - - # Now it's too late to make any change, because - # we have already sent the OPEN frame. - try: - self.c1.transport.channel_max = 666 - assert False, "expected session exception" - except: - pass - - assert self.c1.transport.channel_max == upper_limit - - - # TODO: Currently failing test - PROTON-1759 - skip - def test_channel_max_limits_sessions(self): - raise Skipped('Test fails - PROTON-1759') - # This is an index -- so max number of channels should be 1. - self.c1.transport.channel_max = 0 - self.c1.open() - self.c2.open() - ssn_0 = self.c2.session() - assert ssn_0 != None - ssn_0.open() - self.pump() - try: - ssn_1 = self.c2.session() - ssn_1.open() - self.pump() - assert False, "expected session exception" - except SessionException: - pass - - def test_cleanup(self): - self.c1.open() - self.c2.open() - self.pump() - assert self.c1.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - t1 = self.c1.transport - t2 = self.c2.transport - c2 = self.c2 - self.c1.close() - # release all references to C1, except that held by the transport - self.cleanup() - gc.collect() - # transport should flush last state from C1: - pump(t1, t2) - assert c2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - - def test_user_config(self): - self.c1.user = "vindaloo" - self.c1.password = "secret" - self.c1.open() - self.pump() - - self.c2.user = "leela" - self.c2.password = "trustno1" - self.c2.open() - self.pump() - - assert self.c1.user == "vindaloo", self.c1.user - assert self.c1.password == None, self.c1.password - assert self.c2.user == "leela", self.c2.user - assert self.c2.password == None, self.c2.password - -class SessionTest(Test): - - def setUp(self): - gc.enable() - self.c1, self.c2 = self.connection() - self.ssn = self.c1.session() - self.c1.open() - self.c2.open() - - def cleanup(self): - # release resources created by this class - super(SessionTest, self).cleanup() - self.c1 = None - self.c2 = None - self.ssn = None - - def tearDown(self): - self.cleanup() - gc.collect() - assert not gc.garbage - - def test_open_close(self): - assert self.ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - - self.ssn.open() - - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - - self.pump() - - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - - ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT) - - assert ssn != None - assert ssn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - - ssn.open() - - assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - - self.pump() - - assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - ssn.close() - - assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.pump() - - assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - - self.ssn.close() - - assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - self.pump() - - assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - def test_simultaneous_close(self): - self.ssn.open() - self.pump() - ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT) - assert ssn != None - ssn.open() - self.pump() - - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.ssn.close() - ssn.close() - - assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - - self.pump() - - assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - assert ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - def test_closing_connection(self): - self.ssn.open() - self.pump() - self.c1.close() - self.pump() - self.ssn.close() - self.pump() - - def test_condition(self): - self.ssn.open() - self.pump() - ssn = self.c2.session_head(Endpoint.REMOTE_ACTIVE | Endpoint.LOCAL_UNINIT) - assert ssn != None - ssn.open() - self.pump() - - assert self.ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"}) - self.ssn.condition = cond - self.ssn.close() - - self.pump() - - assert self.ssn.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - - rcond = ssn.remote_condition - assert rcond == cond, (rcond, cond) - - def test_cleanup(self): - snd, rcv = self.link("test-link") - snd.open() - rcv.open() - self.pump() - snd_ssn = snd.session - rcv_ssn = rcv.session - assert rcv_ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - self.ssn = None - snd_ssn.close() - snd_ssn.free() - del snd_ssn - gc.collect() - self.pump() - assert rcv_ssn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - - def test_reopen_on_same_session_without_free(self): - """ - confirm that a link is correctly opened when attaching to a previously - closed link *that has not been freed yet* on the same session - """ - self.ssn.open() - self.pump() - - ssn2 = self.c2.session_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE) - ssn2.open() - self.pump() - snd = self.ssn.sender("test-link") - rcv = ssn2.receiver("test-link") - - assert snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - assert rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - - snd.open() - rcv.open() - self.pump() - - assert snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - snd.close() - rcv.close() - self.pump() - - assert snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - assert rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - snd = self.ssn.sender("test-link") - rcv = ssn2.receiver("test-link") - assert snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - assert rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - - snd.open() - rcv.open() - self.pump() - - assert snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - def test_set_get_outgoing_window(self): - assert self.ssn.outgoing_window == 2147483647 - - self.ssn.outgoing_window = 1024 - assert self.ssn.outgoing_window == 1024 - - -class LinkTest(Test): - - def setUp(self): - gc.enable() - self.snd, self.rcv = self.link("test-link") - - def cleanup(self): - # release resources created by this class - super(LinkTest, self).cleanup() - self.snd = None - self.rcv = None - - def tearDown(self): - self.cleanup() - gc.collect() - assert not gc.garbage, gc.garbage - - def test_open_close(self): - assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - - self.snd.open() - - assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - - self.pump() - - assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE - - self.rcv.open() - - assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.pump() - - assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.snd.close() - - assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.pump() - - assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - - self.rcv.close() - - assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - self.pump() - - assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - def test_simultaneous_open_close(self): - assert self.snd.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - assert self.rcv.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT - - self.snd.open() - self.rcv.open() - - assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_UNINIT - - self.pump() - - assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.snd.close() - self.rcv.close() - - assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - - self.pump() - - assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED - - def test_multiple(self): - rcv = self.snd.session.receiver("second-rcv") - assert rcv.name == "second-rcv" - self.snd.open() - rcv.open() - self.pump() - c2 = self.rcv.session.connection - l = c2.link_head(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE) - while l: - l.open() - l = l.next(Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE) - self.pump() - - assert self.snd - assert rcv - self.snd.close() - rcv.close() - ssn = rcv.session - conn = ssn.connection - ssn.close() - conn.close() - self.pump() - - def test_closing_session(self): - self.snd.open() - self.rcv.open() - ssn1 = self.snd.session - self.pump() - ssn1.close() - self.pump() - self.snd.close() - self.pump() - - def test_closing_connection(self): - self.snd.open() - self.rcv.open() - ssn1 = self.snd.session - c1 = ssn1.connection - self.pump() - c1.close() - self.pump() - self.snd.close() - self.pump() - - def assertEqualTermini(self, t1, t2): - assert t1.type == t2.type, (t1.type, t2.type) - assert t1.address == t2.address, (t1.address, t2.address) - assert t1.durability == t2.durability, (t1.durability, t2.durability) - assert t1.expiry_policy == t2.expiry_policy, (t1.expiry_policy, t2.expiry_policy) - assert t1.timeout == t2.timeout, (t1.timeout, t2.timeout) - assert t1.dynamic == t2.dynamic, (t1.dynamic, t2.dynamic) - for attr in ["properties", "capabilities", "outcomes", "filter"]: - d1 = getattr(t1, attr) - d2 = getattr(t2, attr) - assert d1.format() == d2.format(), (attr, d1.format(), d2.format()) - - def _test_source_target(self, config_source, config_target): - if config_source is None: - self.snd.source.type = Terminus.UNSPECIFIED - else: - config_source(self.snd.source) - if config_target is None: - self.snd.target.type = Terminus.UNSPECIFIED - else: - config_target(self.snd.target) - self.snd.open() - self.pump() - self.assertEqualTermini(self.rcv.remote_source, self.snd.source) - self.assertEqualTermini(self.rcv.remote_target, self.snd.target) - self.rcv.target.copy(self.rcv.remote_target) - self.rcv.source.copy(self.rcv.remote_source) - self.rcv.open() - self.pump() - self.assertEqualTermini(self.snd.remote_target, self.snd.target) - self.assertEqualTermini(self.snd.remote_source, self.snd.source) - - def test_source_target(self): - self._test_source_target(TerminusConfig(address="source"), - TerminusConfig(address="target")) - - def test_source(self): - self._test_source_target(TerminusConfig(address="source"), None) - - def test_target(self): - self._test_source_target(None, TerminusConfig(address="target")) - - def test_coordinator(self): - self._test_source_target(None, TerminusConfig(type=Terminus.COORDINATOR)) - - def test_source_target_full(self): - self._test_source_target(TerminusConfig(address="source", - timeout=3, - dist_mode=Terminus.DIST_MODE_MOVE, - filter=[("int", 1), ("symbol", "two"), ("string", "three")], - capabilities=["one", "two", "three"]), - TerminusConfig(address="source", - timeout=7, - capabilities=[])) - def test_distribution_mode(self): - self._test_source_target(TerminusConfig(address="source", - dist_mode=Terminus.DIST_MODE_COPY), - TerminusConfig(address="target")) - assert self.rcv.remote_source.distribution_mode == Terminus.DIST_MODE_COPY - assert self.rcv.remote_target.distribution_mode == Terminus.DIST_MODE_UNSPECIFIED - - def test_dynamic_link(self): - self._test_source_target(TerminusConfig(address=None, dynamic=True), None) - assert self.rcv.remote_source.dynamic - assert self.rcv.remote_source.address is None - - def test_condition(self): - self.snd.open() - self.rcv.open() - self.pump() - - assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - cond = Condition("blah:bleh", "this is a description", {symbol("foo"): "bar"}) - self.snd.condition = cond - self.snd.close() - - self.pump() - - assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_ACTIVE - assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - - rcond = self.rcv.remote_condition - assert rcond == cond, (rcond, cond) - - def test_settle_mode(self): - self.snd.snd_settle_mode = Link.SND_UNSETTLED - assert self.snd.snd_settle_mode == Link.SND_UNSETTLED - self.rcv.rcv_settle_mode = Link.RCV_SECOND - assert self.rcv.rcv_settle_mode == Link.RCV_SECOND - - assert self.snd.remote_rcv_settle_mode != Link.RCV_SECOND - assert self.rcv.remote_snd_settle_mode != Link.SND_UNSETTLED - - self.snd.open() - self.rcv.open() - self.pump() - - assert self.snd.remote_rcv_settle_mode == Link.RCV_SECOND - assert self.rcv.remote_snd_settle_mode == Link.SND_UNSETTLED - - def test_max_message_size(self): - assert self.snd.max_message_size == 0 - assert self.rcv.remote_max_message_size == 0 - self.snd.max_message_size = 13579 - self.snd.open() - self.rcv.open() - self.pump() - assert self.rcv.remote_max_message_size == 13579 - - def test_cleanup(self): - snd, rcv = self.link("test-link") - snd.open() - rcv.open() - self.pump() - assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - snd.close() - snd.free() - del snd - gc.collect() - self.pump() - assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - -class TerminusConfig: - - def __init__(self, type=None, address=None, timeout=None, durability=None, - filter=None, capabilities=None, dynamic=False, dist_mode=None): - self.address = address - self.timeout = timeout - self.durability = durability - self.filter = filter - self.capabilities = capabilities - self.dynamic = dynamic - self.dist_mode = dist_mode - self.type = type - - def __call__(self, terminus): - if self.type is not None: - terminus.type = self.type - if self.address is not None: - terminus.address = self.address - if self.timeout is not None: - terminus.timeout = self.timeout - if self.durability is not None: - terminus.durability = self.durability - if self.capabilities is not None: - terminus.capabilities.put_array(False, Data.SYMBOL) - terminus.capabilities.enter() - for c in self.capabilities: - terminus.capabilities.put_symbol(c) - if self.filter is not None: - terminus.filter.put_map() - terminus.filter.enter() - for (t, v) in self.filter: - setter = getattr(terminus.filter, "put_%s" % t) - setter(v) - if self.dynamic: - terminus.dynamic = True - if self.dist_mode is not None: - terminus.distribution_mode = self.dist_mode - -class TransferTest(Test): - - def setUp(self): - gc.enable() - self.snd, self.rcv = self.link("test-link") - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - - def cleanup(self): - # release resources created by this class - super(TransferTest, self).cleanup() - self.c1 = None - self.c2 = None - self.snd = None - self.rcv = None - - def tearDown(self): - self.cleanup() - gc.collect() - assert not gc.garbage - - def test_work_queue(self): - assert self.c1.work_head is None - self.snd.delivery("tag") - assert self.c1.work_head is None - self.rcv.flow(1) - self.pump() - d = self.c1.work_head - assert d is not None - tag = d.tag - assert tag == "tag", tag - assert d.writable - - n = self.snd.send(b"this is a test") - assert self.snd.advance() - assert self.c1.work_head is None - - self.pump() - - d = self.c2.work_head - assert d.tag == "tag" - assert d.readable - - def test_multiframe(self): - self.rcv.flow(1) - self.snd.delivery("tag") - msg = b"this is a test" - n = self.snd.send(msg) - assert n == len(msg) - - self.pump() - - d = self.rcv.current - assert d - assert d.tag == "tag", repr(d.tag) - assert d.readable - - binary = self.rcv.recv(1024) - assert binary == msg, (binary, msg) - - binary = self.rcv.recv(1024) - assert binary == b"" - - msg = b"this is more" - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - self.pump() - - binary = self.rcv.recv(1024) - assert binary == msg, (binary, msg) - - binary = self.rcv.recv(1024) - assert binary is None - - def test_disposition(self): - self.rcv.flow(1) - - self.pump() - - sd = self.snd.delivery("tag") - msg = b"this is a test" - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - self.pump() - - rd = self.rcv.current - assert rd is not None - assert rd.tag == sd.tag - rmsg = self.rcv.recv(1024) - assert rmsg == msg - rd.update(Delivery.ACCEPTED) - - self.pump() - - rdisp = sd.remote_state - ldisp = rd.local_state - assert rdisp == ldisp == Delivery.ACCEPTED, (rdisp, ldisp) - assert sd.updated - - sd.update(Delivery.ACCEPTED) - - self.pump() - - assert sd.local_state == rd.remote_state == Delivery.ACCEPTED - sd.settle() - - def test_delivery_id_ordering(self): - self.rcv.flow(1024) - self.pump(buffer_size=64*1024) - - #fill up delivery buffer on sender - for m in range(1024): - sd = self.snd.delivery("tag%s" % m) - msg = ("message %s" % m).encode('ascii') - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - self.pump(buffer_size=64*1024) - - #receive a session-windows worth of messages and accept them - for m in range(1024): - rd = self.rcv.current - assert rd is not None, m - assert rd.tag == ("tag%s" % m), (rd.tag, m) - msg = self.rcv.recv(1024) - assert msg == ("message %s" % m).encode('ascii'), (msg, m) - rd.update(Delivery.ACCEPTED) - rd.settle() - - self.pump(buffer_size=64*1024) - - #add some new deliveries - for m in range(1024, 1450): - sd = self.snd.delivery("tag%s" % m) - msg = ("message %s" % m).encode('ascii') - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - #handle all disposition changes to sent messages - d = self.c1.work_head - while d: - next_d = d.work_next - if d.updated: - d.update(Delivery.ACCEPTED) - d.settle() - d = next_d - - #submit some more deliveries - for m in range(1450, 1500): - sd = self.snd.delivery("tag%s" % m) - msg = ("message %s" % m).encode('ascii') - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - self.pump(buffer_size=64*1024) - self.rcv.flow(1024) - self.pump(buffer_size=64*1024) - - #verify remaining messages can be received and accepted - for m in range(1024, 1500): - rd = self.rcv.current - assert rd is not None, m - assert rd.tag == ("tag%s" % m), (rd.tag, m) - msg = self.rcv.recv(1024) - assert msg == ("message %s" % m).encode('ascii'), (msg, m) - rd.update(Delivery.ACCEPTED) - rd.settle() - - def test_cleanup(self): - self.rcv.flow(10) - self.pump() - - for x in range(10): - self.snd.delivery("tag%d" % x) - msg = b"this is a test" - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - self.snd.close() - self.snd.free() - self.snd = None - gc.collect() - - self.pump() - - for x in range(10): - rd = self.rcv.current - assert rd is not None - assert rd.tag == "tag%d" % x - rmsg = self.rcv.recv(1024) - assert self.rcv.advance() - assert rmsg == msg - # close of snd should've settled: - assert rd.settled - rd.settle() - -class MaxFrameTransferTest(Test): - - def setUp(self): - pass - - def cleanup(self): - # release resources created by this class - super(MaxFrameTransferTest, self).cleanup() - self.c1 = None - self.c2 = None - self.snd = None - self.rcv = None - - def tearDown(self): - self.cleanup() - - def message(self, size): - parts = [] - for i in range(size): - parts.append(str(i)) - return "/".join(parts)[:size].encode("utf-8") - - def testMinFrame(self): - """ - Configure receiver to support minimum max-frame as defined by AMQP-1.0. - Verify transfer of messages larger than 512. - """ - self.snd, self.rcv = self.link("test-link", max_frame=[0,512]) - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - assert self.rcv.session.connection.transport.max_frame_size == 512 - assert self.snd.session.connection.transport.remote_max_frame_size == 512 - - self.rcv.flow(1) - self.snd.delivery("tag") - msg = self.message(513) - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - self.pump() - - binary = self.rcv.recv(513) - assert binary == msg - - binary = self.rcv.recv(1024) - assert binary == None - - def testOddFrame(self): - """ - Test an odd sized max limit with data that will require multiple frames to - be transfered. - """ - self.snd, self.rcv = self.link("test-link", max_frame=[0,521]) - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - assert self.rcv.session.connection.transport.max_frame_size == 521 - assert self.snd.session.connection.transport.remote_max_frame_size == 521 - - self.rcv.flow(2) - self.snd.delivery("tag") - msg = ("X" * 1699).encode('utf-8') - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - self.pump() - - binary = self.rcv.recv(1699) - assert binary == msg - - binary = self.rcv.recv(1024) - assert binary == None - - self.rcv.advance() - - self.snd.delivery("gat") - msg = self.message(1426) - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - self.pump() - - binary = self.rcv.recv(1426) - assert binary == msg - - self.pump() - - binary = self.rcv.recv(1024) - assert binary == None - - def testSendQueuedMultiFrameMessages(self, sendSingleFrameMsg = False): - """ - Test that multiple queued messages on the same link - with multi-frame content are sent correctly. Use an - odd max frame size, send enough data to use many. - """ - self.snd, self.rcv = self.link("test-link", max_frame=[0,517]) - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - assert self.rcv.session.connection.transport.max_frame_size == 517 - assert self.snd.session.connection.transport.remote_max_frame_size == 517 - - self.rcv.flow(5) - - self.pump() - - # Send a delivery with 5 frames, all bytes as X1234 - self.snd.delivery("tag") - msg = ("X1234" * 425).encode('utf-8') - assert 2125 == len(msg) - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - # Send a delivery with 5 frames, all bytes as Y5678 - self.snd.delivery("tag2") - msg2 = ("Y5678" * 425).encode('utf-8') - assert 2125 == len(msg2) - n = self.snd.send(msg2) - assert n == len(msg2) - assert self.snd.advance() - - self.pump() - - if sendSingleFrameMsg: - # Send a delivery with 1 frame - self.snd.delivery("tag3") - msg3 = ("Z").encode('utf-8') - assert 1 == len(msg3) - n = self.snd.send(msg3) - assert n == len(msg3) - assert self.snd.advance() - self.pump() - - binary = self.rcv.recv(5000) - self.assertEqual(binary, msg) - - self.rcv.advance() - - binary2 = self.rcv.recv(5000) - self.assertEqual(binary2, msg2) - - self.rcv.advance() - - if sendSingleFrameMsg: - binary3 = self.rcv.recv(5000) - self.assertEqual(binary3, msg3) - self.rcv.advance() - - self.pump() - - def testSendQueuedMultiFrameMessagesThenSingleFrameMessage(self): - self.testSendQueuedMultiFrameMessages(sendSingleFrameMsg = True) - - def testBigMessage(self): - """ - Test transferring a big message. - """ - self.snd, self.rcv = self.link("test-link") - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - - self.rcv.flow(2) - self.snd.delivery("tag") - msg = self.message(1024*256) - n = self.snd.send(msg) - assert n == len(msg) - assert self.snd.advance() - - self.pump() - - binary = self.rcv.recv(1024*256) - assert binary == msg - - binary = self.rcv.recv(1024) - assert binary == None - - -class IdleTimeoutTest(Test): - - def setUp(self): - pass - - def cleanup(self): - # release resources created by this class - super(IdleTimeoutTest, self).cleanup() - self.snd = None - self.rcv = None - self.c1 = None - self.c2 = None - - def tearDown(self): - self.cleanup() - - def message(self, size): - parts = [] - for i in range(size): - parts.append(str(i)) - return "/".join(parts)[:size] - - def testGetSet(self): - """ - Verify the configuration and negotiation of the idle timeout. - """ - - self.snd, self.rcv = self.link("test-link", idle_timeout=[1.0,2.0]) - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - # proton advertises 1/2 the configured timeout to the peer: - assert self.rcv.session.connection.transport.idle_timeout == 2.0 - assert self.rcv.session.connection.transport.remote_idle_timeout == 0.5 - assert self.snd.session.connection.transport.idle_timeout == 1.0 - assert self.snd.session.connection.transport.remote_idle_timeout == 1.0 - - def testTimeout(self): - """ - Verify the AMQP Connection idle timeout. - """ - - # snd will timeout the Connection if no frame is received within 1000 ticks - self.snd, self.rcv = self.link("test-link", idle_timeout=[1.0,0]) - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - - t_snd = self.snd.session.connection.transport - t_rcv = self.rcv.session.connection.transport - assert t_rcv.idle_timeout == 0.0 - # proton advertises 1/2 the timeout (see spec) - assert t_rcv.remote_idle_timeout == 0.5 - assert t_snd.idle_timeout == 1.0 - assert t_snd.remote_idle_timeout == 0.0 - - sndr_frames_in = t_snd.frames_input - rcvr_frames_out = t_rcv.frames_output - - # at t+1msec, nothing should happen: - clock = 0.001 - assert t_snd.tick(clock) == 1.001, "deadline for remote timeout" - assert t_rcv.tick(clock) == 0.251, "deadline to send keepalive" - self.pump() - assert sndr_frames_in == t_snd.frames_input, "unexpected received frame" - - # at one tick from expected idle frame send, nothing should happen: - clock = 0.250 - assert t_snd.tick(clock) == 1.001, "deadline for remote timeout" - assert t_rcv.tick(clock) == 0.251, "deadline to send keepalive" - self.pump() - assert sndr_frames_in == t_snd.frames_input, "unexpected received frame" - - # this should cause rcvr to expire and send a keepalive - clock = 0.251 - assert t_snd.tick(clock) == 1.001, "deadline for remote timeout" - assert t_rcv.tick(clock) == 0.501, "deadline to send keepalive" - self.pump() - sndr_frames_in += 1 - rcvr_frames_out += 1 - assert sndr_frames_in == t_snd.frames_input, "unexpected received frame" - assert rcvr_frames_out == t_rcv.frames_output, "unexpected frame" - - # since a keepalive was received, sndr will rebase its clock against this tick: - # and the receiver should not change its deadline - clock = 0.498 - assert t_snd.tick(clock) == 1.498, "deadline for remote timeout" - assert t_rcv.tick(clock) == 0.501, "deadline to send keepalive" - self.pump() - assert sndr_frames_in == t_snd.frames_input, "unexpected received frame" - - # now expire sndr - clock = 1.499 - t_snd.tick(clock) - self.pump() - assert self.c2.state & Endpoint.REMOTE_CLOSED - assert self.c2.remote_condition.name == "amqp:resource-limit-exceeded" - -class CreditTest(Test): - - def setUp(self): - self.snd, self.rcv = self.link("test-link", max_frame=(16*1024, 16*1024)) - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - - def cleanup(self): - # release resources created by this class - super(CreditTest, self).cleanup() - self.c1 = None - self.snd = None - self.c2 = None - self.rcv2 = None - self.snd2 = None - - def tearDown(self): - self.cleanup() - - def testCreditSender(self, count=1024): - credit = self.snd.credit - assert credit == 0, credit - self.rcv.flow(10) - self.pump() - credit = self.snd.credit - assert credit == 10, credit - - self.rcv.flow(count) - self.pump() - credit = self.snd.credit - assert credit == 10 + count, credit - - def testCreditReceiver(self): - self.rcv.flow(10) - self.pump() - assert self.rcv.credit == 10, self.rcv.credit - - d = self.snd.delivery("tag") - assert d - assert self.snd.advance() - self.pump() - assert self.rcv.credit == 10, self.rcv.credit - assert self.rcv.queued == 1, self.rcv.queued - c = self.rcv.current - assert c.tag == "tag", c.tag - assert self.rcv.advance() - assert self.rcv.credit == 9, self.rcv.credit - assert self.rcv.queued == 0, self.rcv.queued - - def _testBufferingOnClose(self, a, b): - for i in range(10): - d = self.snd.delivery("tag-%s" % i) - assert d - d.settle() - self.pump() - assert self.snd.queued == 10 - - endpoints = {"connection": (self.c1, self.c2), - "session": (self.snd.session, self.rcv.session), - "link": (self.snd, self.rcv)} - - local_a, remote_a = endpoints[a] - local_b, remote_b = endpoints[b] - - remote_b.close() - self.pump() - assert local_b.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED - local_a.close() - self.pump() - assert remote_a.state & Endpoint.REMOTE_CLOSED - assert self.snd.queued == 10 - - def testBufferingOnCloseLinkLink(self): - self._testBufferingOnClose("link", "link") - - def testBufferingOnCloseLinkSession(self): - self._testBufferingOnClose("link", "session") - - def testBufferingOnCloseLinkConnection(self): - self._testBufferingOnClose("link", "connection") - - def testBufferingOnCloseSessionLink(self): - self._testBufferingOnClose("session", "link") - - def testBufferingOnCloseSessionSession(self): - self._testBufferingOnClose("session", "session") - - def testBufferingOnCloseSessionConnection(self): - self._testBufferingOnClose("session", "connection") - - def testBufferingOnCloseConnectionLink(self): - self._testBufferingOnClose("connection", "link") - - def testBufferingOnCloseConnectionSession(self): - self._testBufferingOnClose("connection", "session") - - def testBufferingOnCloseConnectionConnection(self): - self._testBufferingOnClose("connection", "connection") - - def testFullDrain(self): - assert self.rcv.credit == 0 - assert self.snd.credit == 0 - self.rcv.drain(10) - assert self.rcv.draining() - assert self.rcv.credit == 10 - assert self.snd.credit == 0 - self.pump() - assert self.rcv.credit == 10 - assert self.snd.credit == 10 - assert self.rcv.draining() - self.snd.drained() - assert self.rcv.credit == 10 - assert self.snd.credit == 0 - assert self.rcv.draining() - self.pump() - assert self.rcv.credit == 0 - assert self.snd.credit == 0 - assert not self.rcv.draining() - drained = self.rcv.drained() - assert drained == 10, drained - - def testPartialDrain(self): - self.rcv.drain(2) - assert self.rcv.draining() - self.pump() - - d = self.snd.delivery("tag") - assert d - assert self.snd.advance() - self.snd.drained() - assert self.rcv.draining() - self.pump() - assert not self.rcv.draining() - - c = self.rcv.current - assert self.rcv.queued == 1, self.rcv.queued - assert c.tag == d.tag, c.tag - assert self.rcv.advance() - assert not self.rcv.current - assert self.rcv.credit == 0, self.rcv.credit - assert not self.rcv.draining() - drained = self.rcv.drained() - assert drained == 1, drained - - def testDrainFlow(self): - assert self.rcv.credit == 0 - assert self.snd.credit == 0 - self.rcv.drain(10) - assert self.rcv.credit == 10 - assert self.snd.credit == 0 - self.pump() - assert self.rcv.credit == 10 - assert self.snd.credit == 10 - self.snd.drained() - assert self.rcv.credit == 10 - assert self.snd.credit == 0 - self.pump() - assert self.rcv.credit == 0 - assert self.snd.credit == 0 - self.rcv.flow(10) - assert self.rcv.credit == 10 - assert self.snd.credit == 0 - self.pump() - assert self.rcv.credit == 10 - assert self.snd.credit == 10 - self.snd.drained() - assert self.rcv.credit == 10 - assert self.snd.credit == 10 - self.pump() - assert self.rcv.credit == 10 - assert self.snd.credit == 10 - drained = self.rcv.drained() - assert drained == 10, drained - - def testNegative(self): - assert self.snd.credit == 0 - d = self.snd.delivery("tag") - assert d - assert self.snd.advance() - self.pump() - - assert self.rcv.credit == 0 - assert self.rcv.queued == 0 - - self.rcv.flow(1) - assert self.rcv.credit == 1 - assert self.rcv.queued == 0 - self.pump() - assert self.rcv.credit == 1 - assert self.rcv.queued == 1, self.rcv.queued - - c = self.rcv.current - assert c - assert c.tag == "tag" - assert self.rcv.advance() - assert self.rcv.credit == 0 - assert self.rcv.queued == 0 - - def testDrainZero(self): - assert self.snd.credit == 0 - assert self.rcv.credit == 0 - assert self.rcv.queued == 0 - drained = self.rcv.drained() - assert drained == 0 - - self.rcv.flow(10) - self.pump() - assert self.snd.credit == 10 - assert self.rcv.credit == 10 - assert self.rcv.queued == 0 - - self.snd.drained() - self.pump() - assert self.snd.credit == 10 - assert self.rcv.credit == 10 - assert self.rcv.queued == 0 - drained = self.rcv.drained() - assert drained == 0 - - self.rcv.drain(0) - assert self.snd.credit == 10 - assert self.rcv.credit == 10 - assert self.rcv.queued == 0 - - self.pump() - - assert self.snd.credit == 10 - assert self.rcv.credit == 10 - assert self.rcv.queued == 0 - - self.snd.drained() - assert self.snd.credit == 0 - assert self.rcv.credit == 10 - assert self.rcv.queued == 0 - drained = self.rcv.drained() - assert drained == 0 - self.pump() - - assert self.snd.credit == 0 - assert self.rcv.credit == 0 - assert self.rcv.queued == 0 - drained = self.rcv.drained() - assert drained == 10 - - - def testDrainOrder(self): - """ Verify drain/drained works regardless of ordering. See PROTON-401 - """ - assert self.snd.credit == 0 - assert self.rcv.credit == 0 - assert self.rcv.queued == 0 - - #self.rcv.session.connection.transport.trace(Transport.TRACE_FRM) - #self.snd.session.connection.transport.trace(Transport.TRACE_FRM) - - ## verify that a sender that has reached the drain state will respond - ## promptly to a drain issued by the peer. - self.rcv.flow(10) - self.pump() - assert self.snd.credit == 10, self.snd.credit - assert self.rcv.credit == 10, self.rcv.credit - - sd = self.snd.delivery("tagA") - assert sd - n = self.snd.send(b"A") - assert n == 1 - self.pump() - self.snd.advance() - - # done sending, so signal that we are drained: - self.snd.drained() - self.pump() - assert self.snd.credit == 9, self.snd.credit - assert self.rcv.credit == 10, self.rcv.credit - - self.rcv.drain(0) - self.pump() - assert self.snd.credit == 9, self.snd.credit - assert self.rcv.credit == 10, self.rcv.credit - - data = self.rcv.recv(10) - assert data == b"A", data - self.rcv.advance() - self.pump() - assert self.snd.credit == 9, self.snd.credit - assert self.rcv.credit == 9, self.rcv.credit - - self.snd.drained() - self.pump() - assert self.snd.credit == 0, self.snd.credit - assert self.rcv.credit == 0, self.rcv.credit - - # verify that a drain requested by the peer is not "acknowledged" until - # after the sender has completed sending its pending messages - - self.rcv.flow(10) - self.pump() - assert self.snd.credit == 10, self.snd.credit - assert self.rcv.credit == 10, self.rcv.credit - - sd = self.snd.delivery("tagB") - assert sd - n = self.snd.send(b"B") - assert n == 1 - self.snd.advance() - self.pump() - assert self.snd.credit == 9, self.snd.credit - assert self.rcv.credit == 10, self.rcv.credit - - self.rcv.drain(0) - self.pump() - assert self.snd.credit == 9, self.snd.credit - assert self.rcv.credit == 10, self.rcv.credit - - sd = self.snd.delivery("tagC") - assert sd - n = self.snd.send(b"C") - assert n == 1 - self.snd.advance() - self.pump() - assert self.snd.credit == 8, self.snd.credit - assert self.rcv.credit == 10, self.rcv.credit - - # now that the sender has finished sending everything, it can signal - # drained - self.snd.drained() - self.pump() - assert self.snd.credit == 0, self.snd.credit - assert self.rcv.credit == 2, self.rcv.credit - - data = self.rcv.recv(10) - assert data == b"B", data - self.rcv.advance() - data = self.rcv.recv(10) - assert data == b"C", data - self.rcv.advance() - self.pump() - assert self.snd.credit == 0, self.snd.credit - assert self.rcv.credit == 0, self.rcv.credit - - - def testPushback(self, count=10): - assert self.snd.credit == 0 - assert self.rcv.credit == 0 - - self.rcv.flow(count) - self.pump() - - for i in range(count): - d = self.snd.delivery("tag%s" % i) - assert d - self.snd.advance() - - assert self.snd.queued == count - assert self.rcv.queued == 0 - self.pump() - assert self.snd.queued == 0 - assert self.rcv.queued == count - - d = self.snd.delivery("extra") - self.snd.advance() - - assert self.snd.queued == 1 - assert self.rcv.queued == count - self.pump() - assert self.snd.queued == 1 - assert self.rcv.queued == count - - def testHeadOfLineBlocking(self): - self.snd2 = self.snd.session.sender("link-2") - self.rcv2 = self.rcv.session.receiver("link-2") - self.snd2.open() - self.rcv2.open() - self.pump() - assert self.snd2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - assert self.rcv2.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE - - self.rcv.flow(5) - self.rcv2.flow(10) - self.pump() - - assert self.snd.credit == 5 - assert self.snd2.credit == 10 - - for i in range(10): - tag = "test %d" % i - self.snd.delivery( tag ) - self.snd.send( tag.encode("ascii") ) - assert self.snd.advance() - self.snd2.delivery( tag ) - self.snd2.send( tag.encode("ascii") ) - assert self.snd2.advance() - - self.pump() - - for i in range(5): - b = self.rcv.recv( 512 ) - assert self.rcv.advance() - b = self.rcv2.recv( 512 ) - assert self.rcv2.advance() - - for i in range(5): - b = self.rcv2.recv( 512 ) - assert self.rcv2.advance() - - - -class SessionCreditTest(Test): - - def tearDown(self): - self.cleanup() - - def testBuffering(self, count=32, size=1024, capacity=16*1024, max_frame=1024): - snd, rcv = self.link("test-link", max_frame=(max_frame, max_frame)) - rcv.session.incoming_capacity = capacity - snd.open() - rcv.open() - rcv.flow(count) - self.pump() - - assert count > 0 - - total_bytes = count * size - - assert snd.session.outgoing_bytes == 0, snd.session.outgoing_bytes - assert rcv.session.incoming_bytes == 0, rcv.session.incoming_bytes - assert snd.queued == 0, snd.queued - assert rcv.queued == 0, rcv.queued - - data = bytes(bytearray(size)) - idx = 0 - while snd.credit: - d = snd.delivery("tag%s" % idx) - assert d - n = snd.send(data) - assert n == size, (n, size) - assert snd.advance() - self.pump() - idx += 1 - - assert idx == count, (idx, count) - - assert snd.session.outgoing_bytes < total_bytes, (snd.session.outgoing_bytes, total_bytes) - assert rcv.session.incoming_bytes < capacity, (rcv.session.incoming_bytes, capacity) - assert snd.session.outgoing_bytes + rcv.session.incoming_bytes == total_bytes, \ - (snd.session.outgoing_bytes, rcv.session.incoming_bytes, total_bytes) - if snd.session.outgoing_bytes > 0: - available = rcv.session.incoming_capacity - rcv.session.incoming_bytes - assert available < max_frame, (available, max_frame) - - for i in range(count): - d = rcv.current - assert d, i - pending = d.pending - before = rcv.session.incoming_bytes - assert rcv.advance() - after = rcv.session.incoming_bytes - assert before - after == pending, (before, after, pending) - snd_before = snd.session.incoming_bytes - self.pump() - snd_after = snd.session.incoming_bytes - - assert rcv.session.incoming_bytes < capacity - if snd_before > 0: - assert capacity - after <= max_frame - assert snd_before > snd_after - if snd_after > 0: - available = rcv.session.incoming_capacity - rcv.session.incoming_bytes - assert available < max_frame, available - - def testBufferingSize16(self): - self.testBuffering(size=16) - - def testBufferingSize256(self): - self.testBuffering(size=256) - - def testBufferingSize512(self): - self.testBuffering(size=512) - - def testBufferingSize2048(self): - self.testBuffering(size=2048) - - def testBufferingSize1025(self): - self.testBuffering(size=1025) - - def testBufferingSize1023(self): - self.testBuffering(size=1023) - - def testBufferingSize989(self): - self.testBuffering(size=989) - - def testBufferingSize1059(self): - self.testBuffering(size=1059) - - def testCreditWithBuffering(self): - snd, rcv = self.link("test-link", max_frame=(1024, 1024)) - rcv.session.incoming_capacity = 64*1024 - snd.open() - rcv.open() - rcv.flow(128) - self.pump() - - assert snd.credit == 128, snd.credit - assert rcv.queued == 0, rcv.queued - - idx = 0 - while snd.credit: - d = snd.delivery("tag%s" % idx) - snd.send(("x"*1024).encode('ascii')) - assert d - assert snd.advance() - self.pump() - idx += 1 - - assert idx == 128, idx - assert rcv.queued < 128, rcv.queued - - rcv.flow(1) - self.pump() - assert snd.credit == 1, snd.credit - -class SettlementTest(Test): - - def setUp(self): - self.snd, self.rcv = self.link("test-link") - self.c1 = self.snd.session.connection - self.c2 = self.rcv.session.connection - self.snd.open() - self.rcv.open() - self.pump() - - def cleanup(self): - # release resources created by this class - super(SettlementTest, self).cleanup() - self.c1 = None - self.snd = None - self.c2 = None - self.rcv2 = None - self.snd2 = None - - def tearDown(self): - self.cleanup() - - def testSettleCurrent(self): - self.rcv.flow(10) - self.pump() - - assert self.snd.credit == 10, self.snd.credit - d = self.snd.delivery("tag") - e = self.snd.delivery("tag2") - assert d - assert e - c = self.snd.current - assert c.tag == "tag", c.tag - c.settle() - c = self.snd.current - assert c.tag == "tag2", c.tag - c.settle() - c = self.snd.current - assert not c - self.pump() - - c = self.rcv.current - assert c - assert c.tag == "tag", c.tag - assert c.settled - c.settle() - c = self.rcv.current - assert c - assert c.tag == "tag2", c.tag - assert c.settled - c.settle() - c = self.rcv.current - assert not c - - def testUnsettled(self): - self.rcv.flow(10) - self.pump() - - assert self.snd.unsettled == 0, self.snd.unsettled - assert self.rcv.unsettled == 0, self.rcv.unsettled - - d = self.snd.delivery("tag") - assert d - assert self.snd.unsettled == 1, self.snd.unsettled - assert self.rcv.unsettled == 0, self.rcv.unsettled - assert self.snd.advance() - self.pump() - - assert self.snd.unsettled == 1, self.snd.unsettled - assert self.rcv.unsettled == 1, self.rcv.unsettled - - c = self.rcv.current - assert c - c.settle() - - assert self.snd.unsettled == 1, self.snd.unsettled - assert self.rcv.unsettled == 0, self.rcv.unsettled - - def testMultipleUnsettled(self, count=1024, size=1024): - self.rcv.flow(count) - self.pump() - - assert self.snd.unsettled == 0, self.snd.unsettled - assert self.rcv.unsettled == 0, self.rcv.unsettled - - unsettled = [] - - for i in range(count): - sd = self.snd.delivery("tag%s" % i) - assert sd - n = self.snd.send(("x"*size).encode('ascii')) - assert n == size, n - assert self.snd.advance() - self.pump() - - rd = self.rcv.current - assert rd, "did not receive delivery %s" % i - n = rd.pending - b = self.rcv.recv(n) - assert len(b) == n, (b, n) - rd.update(Delivery.ACCEPTED) - assert self.rcv.advance() - self.pump() - unsettled.append(rd) - - assert self.rcv.unsettled == count - - for rd in unsettled: - rd.settle() - - def testMultipleUnsettled2K1K(self): - self.testMultipleUnsettled(2048, 1024) - - def testMultipleUnsettled4K1K(self): - self.testMultipleUnsettled(4096, 1024) - - def testMultipleUnsettled1K2K(self): - self.testMultipleUnsettled(1024, 2048) - - def testMultipleUnsettled2K2K(self): - self.testMultipleUnsettled(2048, 2048) - - def testMultipleUnsettled4K2K(self): - self.testMultipleUnsettled(4096, 2048) - -class PipelineTest(Test): - - def setUp(self): - self.c1, self.c2 = self.connection() - - def cleanup(self): - # release resources created by this class - super(PipelineTest, self).cleanup() - self.c1 = None - self.c2 = None - - def tearDown(self): - self.cleanup() - - def test(self): - ssn = self.c1.session() - snd = ssn.sender("sender") - self.c1.open() - ssn.open() - snd.open() - - for i in range(10): - t = "delivery-%s" % i - d = snd.delivery(t) - snd.send(t.encode('ascii')) - d.settle() - - snd.close() - ssn.close() - self.c1.close() - - self.pump() - - state = self.c2.state - assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state - ssn2 = self.c2.session_head(Endpoint.LOCAL_UNINIT) - assert ssn2 - state == ssn2.state - assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state - rcv = self.c2.link_head(Endpoint.LOCAL_UNINIT) - assert rcv - state = rcv.state - assert state == (Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_ACTIVE), "%x" % state - - self.c2.open() - ssn2.open() - rcv.open() - rcv.flow(10) - assert rcv.queued == 0, rcv.queued - - self.pump() - - assert rcv.queued == 10, rcv.queued - state = rcv.state - assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state - state = ssn2.state - assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state - state = self.c2.state - assert state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED), "%x" % state - - for i in range(rcv.queued): - d = rcv.current - assert d - assert d.tag == "delivery-%s" % i - d.settle() - - assert rcv.queued == 0, rcv.queued - - -class ServerTest(Test): - - def testKeepalive(self): - """ Verify that idle frames are sent to keep a Connection alive - """ - idle_timeout = self.delay - server = common.TestServer() - server.start() - - class Program: - - def on_reactor_init(self, event): - self.conn = event.reactor.connection() - self.conn.hostname = "%s:%s" % (server.host, server.port) - self.conn.open() - self.old_count = None - event.reactor.schedule(3 * idle_timeout, self) - - def on_connection_bound(self, event): - event.transport.idle_timeout = idle_timeout - - def on_connection_remote_open(self, event): - self.old_count = event.transport.frames_input - - def on_timer_task(self, event): - assert self.conn.state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE), "Connection terminated" - assert self.conn.transport.frames_input > self.old_count, "No idle frames received" - self.conn.close() - - Reactor(Program()).run() - server.stop() - - def testIdleTimeout(self): - """ Verify that a Connection is terminated properly when Idle frames do not - arrive in a timely manner. - """ - idle_timeout = self.delay - server = common.TestServer(idle_timeout=idle_timeout) - server.start() - - class Program: - - def on_reactor_init(self, event): - self.conn = event.reactor.connection() - self.conn.hostname = "%s:%s" % (server.host, server.port) - self.conn.open() - self.remote_condition = None - self.old_count = None - # verify the connection stays up even if we don't explicitly send stuff - # wait up to 3x the idle timeout - event.reactor.schedule(3 * idle_timeout, self) - - def on_connection_bound(self, event): - self.transport = event.transport - - def on_connection_remote_open(self, event): - self.old_count = event.transport.frames_output - - def on_connection_remote_close(self, event): - assert self.conn.remote_condition - assert self.conn.remote_condition.name == "amqp:resource-limit-exceeded" - self.remote_condition = self.conn.remote_condition - - def on_timer_task(self, event): - assert self.conn.state == (Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE), "Connection terminated" - assert self.conn.transport.frames_output > self.old_count, "No idle frames sent" - - # now wait to explicitly cause the other side to expire: - suspend_time = 3 * idle_timeout - if os.name=="nt": - # On windows, the full delay gets too close to the graceful/hard close tipping point - suspend_time = 2.5 * idle_timeout - sleep(suspend_time) - - p = Program() - Reactor(p).run() - assert p.remote_condition - assert p.remote_condition.name == "amqp:resource-limit-exceeded" - server.stop() - -class NoValue: - - def __init__(self): - pass - - def apply(self, dlv): - pass - - def check(self, dlv): - assert dlv.data == None - assert dlv.section_number == 0 - assert dlv.section_offset == 0 - assert dlv.condition == None - assert dlv.failed == False - assert dlv.undeliverable == False - assert dlv.annotations == None - -class RejectValue: - def __init__(self, condition): - self.condition = condition - - def apply(self, dlv): - dlv.condition = self.condition - - def check(self, dlv): - assert dlv.data == None, dlv.data - assert dlv.section_number == 0 - assert dlv.section_offset == 0 - assert dlv.condition == self.condition, (dlv.condition, self.condition) - assert dlv.failed == False - assert dlv.undeliverable == False - assert dlv.annotations == None - -class ReceivedValue: - def __init__(self, section_number, section_offset): - self.section_number = section_number - self.section_offset = section_offset - - def apply(self, dlv): - dlv.section_number = self.section_number - dlv.section_offset = self.section_offset - - def check(self, dlv): - assert dlv.data == None, dlv.data - assert dlv.section_number == self.section_number, (dlv.section_number, self.section_number) - assert dlv.section_offset == self.section_offset - assert dlv.condition == None - assert dlv.failed == False - assert dlv.undeliverable == False - assert dlv.annotations == None - -class ModifiedValue: - def __init__(self, failed, undeliverable, annotations): - self.failed = failed - self.undeliverable = undeliverable - self.annotations = annotations - - def apply(self, dlv): - dlv.failed = self.failed - dlv.undeliverable = self.undeliverable - dlv.annotations = self.annotations - - def check(self, dlv): - assert dlv.data == None, dlv.data - assert dlv.section_number == 0 - assert dlv.section_offset == 0 - assert dlv.condition == None - assert dlv.failed == self.failed - assert dlv.undeliverable == self.undeliverable - assert dlv.annotations == self.annotations, (dlv.annotations, self.annotations) - -class CustomValue: - def __init__(self, data): - self.data = data - - def apply(self, dlv): - dlv.data = self.data - - def check(self, dlv): - assert dlv.data == self.data, (dlv.data, self.data) - assert dlv.section_number == 0 - assert dlv.section_offset == 0 - assert dlv.condition == None - assert dlv.failed == False - assert dlv.undeliverable == False - assert dlv.annotations == None - -class DeliveryTest(Test): - - def tearDown(self): - self.cleanup() - - def testDisposition(self, count=1, tag="tag%i", type=Delivery.ACCEPTED, value=NoValue()): - snd, rcv = self.link("test-link") - snd.open() - rcv.open() - - snd_deliveries = [] - for i in range(count): - d = snd.delivery(tag % i) - snd_deliveries.append(d) - snd.advance() - - rcv.flow(count) - self.pump() - - rcv_deliveries = [] - for i in range(count): - d = rcv.current - assert d.tag == (tag % i) - rcv_deliveries.append(d) - rcv.advance() - - for d in rcv_deliveries: - value.apply(d.local) - d.update(type) - - self.pump() - - for d in snd_deliveries: - assert d.remote_state == type - assert d.remote.type == type - value.check(d.remote) - value.apply(d.local) - d.update(type) - - self.pump() - - for d in rcv_deliveries: - assert d.remote_state == type - assert d.remote.type == type - value.check(d.remote) - - for d in snd_deliveries: - d.settle() - - self.pump() - - for d in rcv_deliveries: - assert d.settled, d.settled - d.settle() - - def testReceived(self): - self.testDisposition(type=Disposition.RECEIVED, value=ReceivedValue(1, 2)) - - def testRejected(self): - self.testDisposition(type=Disposition.REJECTED, value=RejectValue(Condition(symbol("foo")))) - - def testReleased(self): - self.testDisposition(type=Disposition.RELEASED) - - def testModified(self): - self.testDisposition(type=Disposition.MODIFIED, - value=ModifiedValue(failed=True, undeliverable=True, - annotations={"key": "value"})) - - def testCustom(self): - self.testDisposition(type=0x12345, value=CustomValue([1, 2, 3])) - -class CollectorTest(Test): - - def setUp(self): - self.collector = Collector() - - def drain(self): - result = [] - while True: - e = self.collector.peek() - if e: - result.append(e) - self.collector.pop() - else: - break - return result - - def expect(self, *types): - return self.expect_oneof(types) - - def expect_oneof(self, *sequences): - events = self.drain() - types = tuple([e.type for e in events]) - - for alternative in sequences: - if types == alternative: - if len(events) == 1: - return events[0] - elif len(events) > 1: - return events - else: - return - - assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences) - - def expect_until(self, *types): - events = self.drain() - etypes = tuple([e.type for e in events[-len(types):]]) - assert etypes == types, "actual events %s did not end in expect sequence: %s" % (events, types) - -class EventTest(CollectorTest): - - def tearDown(self): - self.cleanup() - - def testEndpointEvents(self): - c1, c2 = self.connection() - c1.collect(self.collector) - self.expect(Event.CONNECTION_INIT) - self.pump() - self.expect() - c2.open() - self.pump() - self.expect(Event.CONNECTION_REMOTE_OPEN) - self.pump() - self.expect() - - ssn = c2.session() - snd = ssn.sender("sender") - ssn.open() - snd.open() - - self.expect() - self.pump() - self.expect(Event.SESSION_INIT, Event.SESSION_REMOTE_OPEN, - Event.LINK_INIT, Event.LINK_REMOTE_OPEN) - - c1.open() - ssn2 = c1.session() - ssn2.open() - rcv = ssn2.receiver("receiver") - rcv.open() - self.pump() - self.expect(Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, - Event.SESSION_INIT, Event.SESSION_LOCAL_OPEN, - Event.TRANSPORT, Event.LINK_INIT, Event.LINK_LOCAL_OPEN, - Event.TRANSPORT) - - rcv.close() - self.expect(Event.LINK_LOCAL_CLOSE, Event.TRANSPORT) - self.pump() - rcv.free() - del rcv - self.expect(Event.LINK_FINAL) - ssn2.free() - del ssn2 - self.pump() - c1.free() - c1.transport.unbind() - self.expect_oneof((Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL, - Event.CONNECTION_UNBOUND, Event.CONNECTION_FINAL), - (Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.LINK_FINAL, - Event.SESSION_FINAL, Event.CONNECTION_FINAL)) - - def testConnectionINIT_FINAL(self): - c = Connection() - c.collect(self.collector) - self.expect(Event.CONNECTION_INIT) - c.free() - self.expect(Event.CONNECTION_FINAL) - - def testSessionINIT_FINAL(self): - c = Connection() - c.collect(self.collector) - self.expect(Event.CONNECTION_INIT) - s = c.session() - self.expect(Event.SESSION_INIT) - s.free() - self.expect(Event.SESSION_FINAL) - c.free() - self.expect(Event.CONNECTION_FINAL) - - def testLinkINIT_FINAL(self): - c = Connection() - c.collect(self.collector) - self.expect(Event.CONNECTION_INIT) - s = c.session() - self.expect(Event.SESSION_INIT) - r = s.receiver("asdf") - self.expect(Event.LINK_INIT) - r.free() - self.expect(Event.LINK_FINAL) - c.free() - self.expect(Event.SESSION_FINAL, Event.CONNECTION_FINAL) - - def testFlowEvents(self): - snd, rcv = self.link("test-link") - snd.session.connection.collect(self.collector) - rcv.open() - rcv.flow(10) - self.pump() - self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT, - Event.LINK_INIT, Event.LINK_REMOTE_OPEN, Event.LINK_FLOW) - rcv.flow(10) - self.pump() - self.expect(Event.LINK_FLOW) - return snd, rcv - - def testDeliveryEvents(self): - snd, rcv = self.link("test-link") - rcv.session.connection.collect(self.collector) - rcv.open() - rcv.flow(10) - self.pump() - self.expect(Event.CONNECTION_INIT, Event.SESSION_INIT, - Event.LINK_INIT, Event.LINK_LOCAL_OPEN, Event.TRANSPORT) - snd.delivery("delivery") - snd.send(b"Hello World!") - snd.advance() - self.pump() - self.expect() - snd.open() - self.pump() - self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY) - rcv.session.connection.transport.unbind() - rcv.session.connection.free() - self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT, Event.LINK_FINAL, - Event.SESSION_FINAL, Event.CONNECTION_FINAL) - - def testDeliveryEventsDisp(self): - snd, rcv = self.testFlowEvents() - snd.open() - dlv = snd.delivery("delivery") - snd.send(b"Hello World!") - assert snd.advance() - self.expect(Event.LINK_LOCAL_OPEN, Event.TRANSPORT) - self.pump() - self.expect(Event.LINK_FLOW) - rdlv = rcv.current - assert rdlv != None - assert rdlv.tag == "delivery" - rdlv.update(Delivery.ACCEPTED) - self.pump() - event = self.expect(Event.DELIVERY) - assert event.context == dlv, (dlv, event.context) - - def testConnectionBOUND_UNBOUND(self): - c = Connection() - c.collect(self.collector) - self.expect(Event.CONNECTION_INIT) - t = Transport() - t.bind(c) - self.expect(Event.CONNECTION_BOUND) - t.unbind() - self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT) - - def testTransportERROR_CLOSE(self): - c = Connection() - c.collect(self.collector) - self.expect(Event.CONNECTION_INIT) - t = Transport() - t.bind(c) - self.expect(Event.CONNECTION_BOUND) - assert t.condition is None - t.push(b"asdf") - self.expect(Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED) - assert t.condition is not None - assert t.condition.name == "amqp:connection:framing-error" - assert "AMQP header mismatch" in t.condition.description - p = t.pending() - assert p > 0 - t.pop(p) - self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) - - def testTransportCLOSED(self): - c = Connection() - c.collect(self.collector) - self.expect(Event.CONNECTION_INIT) - t = Transport() - t.bind(c) - c.open() - - self.expect(Event.CONNECTION_BOUND, Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT) - - c2 = Connection() - t2 = Transport() - t2.bind(c2) - c2.open() - c2.close() - - pump(t, t2) - - self.expect(Event.CONNECTION_REMOTE_OPEN, Event.CONNECTION_REMOTE_CLOSE, - Event.TRANSPORT_TAIL_CLOSED) - - c.close() - - pump(t, t2) - - self.expect(Event.CONNECTION_LOCAL_CLOSE, Event.TRANSPORT, - Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) - - def testLinkDetach(self): - c1 = Connection() - c1.collect(self.collector) - t1 = Transport() - t1.bind(c1) - c1.open() - s1 = c1.session() - s1.open() - l1 = s1.sender("asdf") - l1.open() - l1.detach() - self.expect_until(Event.LINK_LOCAL_DETACH, Event.TRANSPORT) - - c2 = Connection() - c2.collect(self.collector) - t2 = Transport() - t2.bind(c2) - - pump(t1, t2) - - self.expect_until(Event.LINK_REMOTE_DETACH) - -class PeerTest(CollectorTest): - - def setUp(self): - CollectorTest.setUp(self) - self.connection = Connection() - self.connection.collect(self.collector) - self.transport = Transport() - self.transport.bind(self.connection) - self.peer = Connection() - self.peer_transport = Transport() - self.peer_transport.bind(self.peer) - self.peer_transport.trace(Transport.TRACE_OFF) - - def pump(self): - pump(self.transport, self.peer_transport) - -class TeardownLeakTest(PeerTest): - - def doLeak(self, local, remote): - self.connection.open() - self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, - Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT) - - ssn = self.connection.session() - ssn.open() - self.expect(Event.SESSION_INIT, Event.SESSION_LOCAL_OPEN, Event.TRANSPORT) - - snd = ssn.sender("sender") - snd.open() - self.expect(Event.LINK_INIT, Event.LINK_LOCAL_OPEN, Event.TRANSPORT) - - - self.pump() - - self.peer.open() - self.peer.session_head(0).open() - self.peer.link_head(0).open() - - self.pump() - self.expect_oneof((Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN, - Event.LINK_REMOTE_OPEN, Event.LINK_FLOW), - (Event.CONNECTION_REMOTE_OPEN, Event.SESSION_REMOTE_OPEN, - Event.LINK_REMOTE_OPEN)) - - if local: - snd.close() # ha!! - self.expect(Event.LINK_LOCAL_CLOSE, Event.TRANSPORT) - ssn.close() - self.expect(Event.SESSION_LOCAL_CLOSE, Event.TRANSPORT) - self.connection.close() - self.expect(Event.CONNECTION_LOCAL_CLOSE, Event.TRANSPORT) - - if remote: - self.peer.link_head(0).close() # ha!! - self.peer.session_head(0).close() - self.peer.close() - - self.pump() - - if remote: - self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE, - Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE, - Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_CLOSED) - else: - self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.SESSION_REMOTE_CLOSE, - Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED, - Event.TRANSPORT_CLOSED) - - self.connection.free() - self.expect(Event.LINK_FINAL, Event.SESSION_FINAL) - self.transport.unbind() - - self.expect(Event.CONNECTION_UNBOUND, Event.CONNECTION_FINAL) - - def testLocalRemoteLeak(self): - self.doLeak(True, True) - - def testLocalLeak(self): - self.doLeak(True, False) - - def testRemoteLeak(self): - self.doLeak(False, True) - - def testLeak(self): - self.doLeak(False, False) - -class IdleTimeoutEventTest(PeerTest): - - def half_pump(self): - p = self.transport.pending() - if p>0: - self.transport.pop(p) - - def testTimeoutWithZombieServer(self, expectOpenCloseFrames=True): - self.transport.idle_timeout = self.delay - self.connection.open() - self.half_pump() - t = time() - self.transport.tick(t) - self.transport.tick(t + self.delay*4) - self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, - Event.CONNECTION_LOCAL_OPEN, Event.TRANSPORT, - Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED) - assert self.transport.capacity() < 0 - if expectOpenCloseFrames: - assert self.transport.pending() > 0 - self.half_pump() - self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) - assert self.transport.pending() < 0 - - def testTimeoutWithZombieServerAndSASL(self): - sasl = self.transport.sasl() - self.testTimeoutWithZombieServer(expectOpenCloseFrames=False) - -class DeliverySegFaultTest(Test): - - def testDeliveryAfterUnbind(self): - conn = Connection() - t = Transport() - ssn = conn.session() - snd = ssn.sender("sender") - dlv = snd.delivery("tag") - dlv.settle() - del dlv - t.bind(conn) - t.unbind() - dlv = snd.delivery("tag") - -class SaslEventTest(CollectorTest): - - def testAnonymousNoInitialResponse(self): - conn = Connection() - conn.collect(self.collector) - transport = Transport(Transport.SERVER) - transport.bind(conn) - self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND) - - transport.push(b'AMQP\x03\x01\x00\x00\x00\x00\x00 \x02\x01\x00\x00\x00SA' - b'\xd0\x00\x00\x00\x10\x00\x00\x00\x02\xa3\tANONYMOUS@' - b'AMQP\x00\x01\x00\x00') - self.expect(Event.TRANSPORT) - for i in range(1024): - p = transport.pending() - self.drain() - p = transport.pending() - self.expect() - - def testPipelinedServerReadFirst(self): - conn = Connection() - conn.collect(self.collector) - transport = Transport(Transport.CLIENT) - s = transport.sasl() - s.allowed_mechs("ANONYMOUS PLAIN") - transport.bind(conn) - self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND) - transport.push( - # SASL - b'AMQP\x03\x01\x00\x00' - # @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]] - b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS' - # @sasl-outcome(68) [code=0] - b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00' - # AMQP - b'AMQP\x00\x01\x00\x00' - ) - self.expect(Event.TRANSPORT) - p = transport.pending() - bytes = transport.peek(p) - transport.pop(p) - - server = Transport(Transport.SERVER) - server.push(bytes) - assert s.outcome == SASL.OK - assert server.sasl().outcome == SASL.OK - - def testPipelinedServerWriteFirst(self): - conn = Connection() - conn.collect(self.collector) - transport = Transport(Transport.CLIENT) - s = transport.sasl() - s.allowed_mechs("ANONYMOUS") - transport.bind(conn) - p = transport.pending() - bytes = transport.peek(p) - transport.pop(p) - self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND) - transport.push( - # SASL - b'AMQP\x03\x01\x00\x00' - # @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS]] - b'\x00\x00\x00\x1c\x02\x01\x00\x00\x00S@\xc0\x0f\x01\xe0\x0c\x01\xa3\tANONYMOUS' - # @sasl-outcome(68) [code=0] - b'\x00\x00\x00\x10\x02\x01\x00\x00\x00SD\xc0\x03\x01P\x00' - # AMQP - b'AMQP\x00\x01\x00\x00' - ) - self.expect(Event.TRANSPORT) - p = transport.pending() - bytes = transport.peek(p) - transport.pop(p) - assert s.outcome == SASL.OK - # XXX: the bytes above appear to be correct, but we don't get any - # sort of event indicating that the transport is authenticated
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9778eda8/tests/python/proton_tests/handler.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/handler.py b/tests/python/proton_tests/handler.py deleted file mode 100644 index c7210f4..0000000 --- a/tests/python/proton_tests/handler.py +++ /dev/null @@ -1,123 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -from __future__ import absolute_import - -import os, gc, traceback -import sys -from . import common -from time import time, sleep -from proton import * -from .common import pump, Skipped -from proton.reactor import Reactor - - -CUSTOM = EventType("custom") - -class HandlerTest(common.Test): - def test_reactorHandlerCycling(self, n=0): - - class CustomHandler(Handler): - UNSET = 999999999 - def __init__(self): - self.offset = len(traceback.extract_stack()) - def on_reactor_init(self, event): - self.depth = len(traceback.extract_stack()) - def reset(self): - self.depth = self.UNSET - @property - def init_depth(self): - d = self.depth - self.offset - return d - custom = CustomHandler() - - reactor = Reactor() - reactor.handler = custom - for i in range(n): - h = reactor.handler - reactor.handler = h - custom.reset() - reactor.run() - assert custom.init_depth < 50, "Unexpectedly long traceback for a simple handler" - - def test_reactorHandlerCycling10k(self): - self.test_reactorHandlerCycling(10000) - - def test_reactorHandlerCycling100(self): - self.test_reactorHandlerCycling(100) - - def do_customEvent(self, reactor_handler, event_root): - - class CustomHandler: - did_custom = False - did_init = False - def __init__(self, *handlers): - self.handlers = handlers - def on_reactor_init(self, event): - self.did_init = True - def on_custom(self, event): - self.did_custom = True - - class CustomInvoker(CustomHandler): - def on_reactor_init(self, event): - h = event_root(event) - event.dispatch(h, CUSTOM) - self.did_init = True - - child = CustomInvoker() - root = CustomHandler(child) - - reactor = Reactor() - - reactor_handler(reactor, root) - reactor.run() - assert root.did_init - assert child.did_init - assert root.did_custom - assert child.did_custom - - def set_root(self, reactor, root): - reactor.handler = root - def add_root(self, reactor, root): - reactor.handler.add(root) - def append_root(self, reactor, root): - reactor.handler.handlers.append(root) - - def event_root(self, event): - return event.root - - def event_reactor_handler(self, event): - return event.reactor.handler - - def test_set_handler(self): - self.do_customEvent(self.set_root, self.event_reactor_handler) - - def test_add_handler(self): - self.do_customEvent(self.add_root, self.event_reactor_handler) - - def test_append_handler(self): - self.do_customEvent(self.append_root, self.event_reactor_handler) - - def test_set_root(self): - self.do_customEvent(self.set_root, self.event_root) - - def test_add_root(self): - self.do_customEvent(self.add_root, self.event_root) - - def test_append_root(self): - self.do_customEvent(self.append_root, self.event_root) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org