PROTON-915: Send correct AMQP header upon protocol mismatch - Split apart the transport tests into client and server tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3aab9a07 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3aab9a07 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3aab9a07 Branch: refs/heads/kgiusti-python3 Commit: 3aab9a07bca507aa9160e00eb54179b3df441ebb Parents: 638c18b Author: Andrew Stitcher <astitc...@apache.org> Authored: Wed Jun 17 15:44:29 2015 -0400 Committer: Andrew Stitcher <astitc...@apache.org> Committed: Wed Jun 17 18:39:03 2015 -0400 ---------------------------------------------------------------------- proton-c/src/transport/transport.c | 13 +- tests/python/proton_tests/transport.py | 190 ++++++++++++++++++++++++++-- 2 files changed, 191 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3aab9a07/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index 0e23975..733f695 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -168,6 +168,13 @@ const pn_io_layer_t pni_passthru_layer = { NULL }; +const pn_io_layer_t pni_header_error_layer = { + pn_io_layer_input_error, + pn_output_write_amqp_header, + NULL, + NULL +}; + const pn_io_layer_t pni_error_layer = { pn_io_layer_input_error, pn_io_layer_output_error, @@ -286,7 +293,7 @@ ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int lay pn_do_error(transport, "amqp:connection:framing-error", "%s: '%s'%s", error, quoted, !eos ? "" : " (connection aborted)"); - pn_set_error_layer(transport); + transport->io_layers[layer] = &pni_header_error_layer; return 0; } @@ -2397,7 +2404,9 @@ static ssize_t pn_output_write_amqp_header(pn_transport_t* transport, unsigned i pn_transport_logf(transport, " -> %s", "AMQP"); assert(available >= 8); memmove(bytes, AMQP_HEADER, 8); - if (transport->io_layers[layer] == &amqp_write_header_layer) { + if (transport->io_layers[layer] == &pni_header_error_layer) { + transport->io_layers[layer] = &pni_error_layer; + }else if (transport->io_layers[layer] == &amqp_write_header_layer) { transport->io_layers[layer] = &amqp_layer; } else { transport->io_layers[layer] = &amqp_read_header_layer; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3aab9a07/tests/python/proton_tests/transport.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/transport.py b/tests/python/proton_tests/transport.py index e56b122..958abf7 100644 --- a/tests/python/proton_tests/transport.py +++ b/tests/python/proton_tests/transport.py @@ -24,7 +24,7 @@ from proton import * class Test(common.Test): pass -class TransportTest(Test): +class ClientTransportTest(Test): def setup(self): self.transport = Transport() @@ -87,6 +87,16 @@ class TransportTest(Test): self.transport.close_tail() self.assert_error(u'amqp:connection:framing-error') + def testProtocolNotSupported(self): + self.transport.push("AMQP\x01\x01\x0a\x00") + p = self.transport.pending() + assert p >= 8, p + bytes = self.transport.peek(p) + assert bytes[:8] == "AMQP\x00\x01\x00\x00" + self.transport.pop(p) + self.drain() + assert self.transport.closed + def testPeek(self): out = self.transport.peek(1024) assert out is not None @@ -163,30 +173,190 @@ class TransportTest(Test): self.peer.push(dat2[len(dat1):]) self.peer.push(dat3) +class ServerTransportTest(Test): + + def setup(self): + self.transport = Transport(Transport.SERVER) + self.peer = Transport() + self.conn = Connection() + self.peer.bind(self.conn) + + def teardown(self): + self.transport = None + self.peer = None + self.conn = None + + def drain(self): + while True: + p = self.transport.pending() + if p < 0: + return + elif p > 0: + bytes = self.transport.peek(p) + self.peer.push(bytes) + self.transport.pop(len(bytes)) + else: + assert False + + def assert_error(self, name): + assert self.conn.remote_container is None, self.conn.remote_container + self.drain() + # verify that we received an open frame + assert self.conn.remote_container is not None, self.conn.remote_container + # verify that we received a close frame + assert self.conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_CLOSED, self.conn.state + # verify that a framing error was reported + assert self.conn.remote_condition.name == name, self.conn.remote_condition + + # TODO: This may no longer be testing anything + def testEOS(self): + self.transport.push("") # should be a noop + self.transport.close_tail() + p = self.transport.pending() + self.drain() + assert self.transport.closed + + def testPartial(self): + self.transport.push("AMQ") # partial header + self.transport.close_tail() + p = self.transport.pending() + assert p >= 8, p + bytes = self.transport.peek(p) + assert bytes[:8] == "AMQP\x00\x01\x00\x00" + self.transport.pop(p) + self.drain() + assert self.transport.closed + + def testGarbage(self, garbage="GARBAGE_"): + self.transport.push(garbage) + p = self.transport.pending() + assert p >= 8, p + bytes = self.transport.peek(p) + assert bytes[:8] == "AMQP\x00\x01\x00\x00" + self.transport.pop(p) + self.drain() + assert self.transport.closed + + def testSmallGarbage(self): + self.testGarbage("XXX") + + def testBigGarbage(self): + self.testGarbage("GARBAGE_XXX") + + def testHeader(self): + self.transport.push("AMQP\x00\x01\x00\x00") + self.transport.close_tail() + self.assert_error(u'amqp:connection:framing-error') + + def testProtocolNotSupported(self): + self.transport.push("AMQP\x01\x01\x0a\x00") + p = self.transport.pending() + assert p >= 8, p + bytes = self.transport.peek(p) + assert bytes[:8] == "AMQP\x00\x01\x00\x00" + self.transport.pop(p) + self.drain() + assert self.transport.closed + + def testPeek(self): + out = self.transport.peek(1024) + assert out is not None + + def testBindAfterOpen(self): + conn = Connection() + ssn = conn.session() + conn.open() + ssn.open() + conn.container = "test-container" + conn.hostname = "test-hostname" + trn = Transport() + trn.bind(conn) + out = trn.peek(1024) + assert "test-container" in out, repr(out) + assert "test-hostname" in out, repr(out) + self.transport.push(out) + + c = Connection() + assert c.remote_container == None + assert c.remote_hostname == None + assert c.session_head(0) == None + self.transport.bind(c) + assert c.remote_container == "test-container" + assert c.remote_hostname == "test-hostname" + assert c.session_head(0) != None + + def testCloseHead(self): + n = self.transport.pending() + assert n >= 0, n + try: + self.transport.close_head() + except TransportException, e: + assert "aborted" in str(e), str(e) + n = self.transport.pending() + assert n < 0, n + + def testCloseTail(self): + n = self.transport.capacity() + assert n > 0, n + try: + self.transport.close_tail() + except TransportException, e: + assert "aborted" in str(e), str(e) + n = self.transport.capacity() + assert n < 0, n + + def testUnpairedPop(self): + conn = Connection() + self.transport.bind(conn) + + conn.hostname = "hostname" + conn.open() + + dat1 = self.transport.peek(1024) + + ssn = conn.session() + ssn.open() + + dat2 = self.transport.peek(1024) + + assert dat2[:len(dat1)] == dat1 + + snd = ssn.sender("sender") + snd.open() + + self.transport.pop(len(dat1)) + self.transport.pop(len(dat2) - len(dat1)) + dat3 = self.transport.peek(1024) + self.transport.pop(len(dat3)) + assert self.transport.peek(1024) == "" + + self.peer.push(dat1) + self.peer.push(dat2[len(dat1):]) + self.peer.push(dat3) + def testEOSAfterSASL(self): - srv = Transport(mode=Transport.SERVER) - srv.sasl().allowed_mechs('ANONYMOUS') + self.transport.sasl().allowed_mechs('ANONYMOUS') self.peer.sasl().allowed_mechs('ANONYMOUS') # this should send over the sasl header plus a sasl-init set up # for anonymous p = self.peer.pending() - srv.push(self.peer.peek(p)) + self.transport.push(self.peer.peek(p)) self.peer.pop(p) # now we send EOS - srv.close_tail() + self.transport.close_tail() # the server may send an error back - p = srv.pending() + p = self.transport.pending() while p>0: - self.peer.push(srv.peek(p)) - srv.pop(p) - p = srv.pending() + self.peer.push(self.transport.peek(p)) + self.transport.pop(p) + p = self.transport.pending() # server closed - assert srv.pending() < 0 + assert self.transport.pending() < 0 class LogTest(Test): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org