Author: rhs Date: Tue May 1 20:56:28 2012 New Revision: 1332815 URL: http://svn.apache.org/viewvc?rev=1332815&view=rev Log: fixed bugs in socket shutdown; improved transport interface
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h qpid/proton/trunk/proton-c/src/driver.c qpid/proton/trunk/proton-c/src/engine/engine-internal.h qpid/proton/trunk/proton-c/src/engine/engine.c qpid/proton/trunk/proton-c/tests/engine.py Modified: qpid/proton/trunk/proton-c/include/proton/engine.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1332815&r1=1332814&r2=1332815&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/include/proton/engine.h (original) +++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue May 1 20:56:28 2012 @@ -92,14 +92,11 @@ void pn_connection_close(pn_connection_t void pn_connection_destroy(pn_connection_t *connection); // transport -pn_state_t pn_transport_state(pn_transport_t *transport); pn_error_t *pn_transport_error(pn_transport_t *transport); ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available); ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size); time_t pn_tick(pn_transport_t *transport, time_t now); void pn_trace(pn_transport_t *transport, pn_trace_t trace); -void pn_transport_open(pn_transport_t *transport); -void pn_transport_close(pn_transport_t *transport); void pn_transport_destroy(pn_transport_t *transport); // session Modified: qpid/proton/trunk/proton-c/src/driver.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1332815&r1=1332814&r2=1332815&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/driver.c (original) +++ qpid/proton/trunk/proton-c/src/driver.c Tue May 1 20:56:28 2012 @@ -398,7 +398,7 @@ static void pn_connector_read(pn_connect { ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0); if (n <= 0) { - printf("disconnected: %zi\n", n); + if (n < 0) perror("read"); ctor->status &= ~PN_SEL_RD; ctor->input_eos = true; } else { @@ -421,11 +421,12 @@ static void pn_connector_process_input(p } else if (n == 0) { break; } else { - if (n != PN_EOS) { + if (n == PN_EOS) { + pn_connector_consume(ctor, ctor->input_size); + } else { printf("error in process_input: %zi\n", n); } ctor->input_done = true; - ctor->output_done = true; break; } } @@ -438,6 +439,7 @@ static ssize_t pn_connector_read_sasl_he fprintf(stderr, "sasl header missmatch: "); pn_fprint_data(stderr, ctor->input, ctor->input_size); fprintf(stderr, "\n"); + ctor->output_done = true; return PN_ERR; } else { fprintf(stderr, " <- AMQP SASL 1.0\n"); @@ -448,6 +450,7 @@ static ssize_t pn_connector_read_sasl_he fprintf(stderr, "sasl header missmatch: "); pn_fprint_data(stderr, ctor->input, ctor->input_size); fprintf(stderr, "\n"); + ctor->output_done = true; return PN_ERR; } @@ -473,6 +476,7 @@ static ssize_t pn_connector_read_amqp_he fprintf(stderr, "amqp header missmatch: "); pn_fprint_data(stderr, ctor->input, ctor->input_size); fprintf(stderr, "\n"); + ctor->output_done = true; return PN_ERR; } else { fprintf(stderr, " <- AMQP 1.0\n"); @@ -483,6 +487,7 @@ static ssize_t pn_connector_read_amqp_he fprintf(stderr, "amqp header missmatch: "); pn_fprint_data(stderr, ctor->input, ctor->input_size); fprintf(stderr, "\n"); + ctor->output_done = true; return PN_ERR; } @@ -492,13 +497,7 @@ static ssize_t pn_connector_read_amqp_he static ssize_t pn_connector_read_amqp(pn_connector_t *ctor) { pn_transport_t *transport = ctor->transport; - size_t n = 0; - if (ctor->input_size) { - n = pn_input(transport, ctor->input, ctor->input_size); - } else if (ctor->input_eos) { - ctor->input_done = true; - } - return n; + return pn_input(transport, ctor->input, ctor->input_size); } static char *pn_connector_output(pn_connector_t *ctor) @@ -524,7 +523,6 @@ static void pn_connector_process_output( fprintf(stderr, "error in process_output: %zi\n", n); } ctor->output_done = true; - ctor->input_done = true; break; } } @@ -578,7 +576,6 @@ static ssize_t pn_connector_write_amqp_h fprintf(stderr, " -> AMQP 1.0\n"); memmove(pn_connector_output(ctor), "AMQP\x00\x01\x00\x00", 8); ctor->process_output = pn_connector_write_amqp; - pn_transport_open(ctor->transport); return 8; } @@ -618,6 +615,7 @@ void pn_connector_process(pn_connector_t d->fds[idx].revents &= ~POLLOUT; } if (c->output_size == 0 && c->input_done && c->output_done) { + fprintf(stderr, "closed\n"); pn_connector_close(c); } } Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1332815&r1=1332814&r2=1332815&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original) +++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue May 1 20:56:28 2012 @@ -101,7 +101,9 @@ struct pn_transport_t { pn_connection_t *connection; pn_dispatcher_t *disp; bool open_sent; + bool open_rcvd; bool close_sent; + bool close_rcvd; pn_session_state_t *sessions; size_t session_capacity; pn_session_state_t **channels; Modified: qpid/proton/trunk/proton-c/src/engine/engine.c URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1332815&r1=1332814&r2=1332815&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/src/engine/engine.c (original) +++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue May 1 20:56:28 2012 @@ -662,7 +662,9 @@ void pn_transport_init(pn_transport_t *t pn_dispatcher_action(transport->disp, CLOSE, "CLOSE", pn_do_close); transport->open_sent = false; + transport->open_rcvd = false; transport->close_sent = false; + transport->close_rcvd = false; transport->sessions = NULL; transport->session_capacity = 0; @@ -717,11 +719,6 @@ pn_transport_t *pn_transport(pn_connecti } } -pn_state_t pn_transport_state(pn_transport_t *transport) -{ - return transport->endpoint.state; -} - pn_error_t *pn_transport_error(pn_transport_t *transport) { return &transport->endpoint.error; @@ -1018,12 +1015,12 @@ void pn_do_error(pn_transport_t *transpo // XXX: result vsnprintf(transport->endpoint.error.description, DESCRIPTION, fmt, ap); va_end(ap); - fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.error.description); - if (!transport->close_sent) + if (!transport->close_sent) { pn_post_close(transport); - PN_SET_LOCAL(transport->endpoint.state, PN_LOCAL_CLOSED); - PN_SET_REMOTE(transport->endpoint.state, PN_REMOTE_CLOSED); + transport->close_sent = true; + } transport->disp->halt = true; + fprintf(stderr, "ERROR %s %s\n", condition, transport->endpoint.error.description); } void pn_do_open(pn_dispatcher_t *disp) @@ -1286,26 +1283,30 @@ void pn_do_end(pn_dispatcher_t *disp) void pn_do_close(pn_dispatcher_t *disp) { pn_transport_t *transport = disp->context; + transport->close_rcvd = true; PN_SET_REMOTE(transport->connection->endpoint.state, PN_REMOTE_CLOSED); - PN_SET_REMOTE(transport->endpoint.state, PN_REMOTE_CLOSED); } ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available) { - if (transport->endpoint.state & PN_LOCAL_UNINIT) { - return 0; - } - - if (transport->endpoint.state & PN_LOCAL_CLOSED) { - return PN_EOS; + if (!available) { + pn_do_error(transport, "amqp:connection:framing-error", "connection aborted"); + fprintf(stderr, " <- EOS\n"); + return PN_ERR; } - if (transport->endpoint.state & PN_REMOTE_CLOSED) { + if (transport->close_rcvd) { pn_do_error(transport, "amqp:connection:framing-error", "data after close"); + fprintf(stderr, " <- EOS\n"); return PN_ERR; } - return pn_dispatcher_input(transport->disp, bytes, available); + ssize_t n = pn_dispatcher_input(transport->disp, bytes, available); + if (n >= 0 && transport->close_rcvd) { + fprintf(stderr, " <- EOS\n"); + return PN_EOS; + } + return n; } void pn_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) @@ -1645,11 +1646,10 @@ void pn_process(pn_transport_t *transpor ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size) { - if (!(transport->endpoint.state & PN_LOCAL_UNINIT)) { - pn_process(transport); - } + pn_process(transport); - if (!transport->disp->available && transport->endpoint.state & PN_LOCAL_CLOSED) { + if (!transport->disp->available && transport->close_sent) { + fprintf(stderr, " -> EOS\n"); return PN_EOS; } Modified: qpid/proton/trunk/proton-c/tests/engine.py URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/tests/engine.py?rev=1332815&r1=1332814&r2=1332815&view=diff ============================================================================== --- qpid/proton/trunk/proton-c/tests/engine.py (original) +++ qpid/proton/trunk/proton-c/tests/engine.py Tue May 1 20:56:28 2012 @@ -27,17 +27,17 @@ from cproton import * def pump(t1, t2): while True: cd, out1 = pn_output(t1, 1024) - assert cd >= 0, cd + assert cd >= 0 or cd == PN_EOS, (cd, out1) cd, out2 = pn_output(t2, 1024) - assert cd >= 0, cd + assert cd >= 0 or cd == PN_EOS, (cd, out2) if out1 or out2: if out1: cd = pn_input(t2, out1) - assert cd == len(out1), (cd, out1) + assert cd == PN_EOS or cd == len(out1), (cd, out1) if out2: cd = pn_input(t1, out2) - assert cd == len(out2), (cd, out2) + assert cd == PN_EOS or cd == len(out2), (cd, out2) else: return @@ -51,8 +51,6 @@ class Test: self.c2 = pn_connection() self.t1 = pn_transport(self.c1) self.t2 = pn_transport(self.c2) - pn_transport_open(self.t1) - pn_transport_open(self.t2) trc = os.environ.get("PN_TRACE_FRM") if trc and trc.lower() in ("1", "2", "yes", "true"): pn_trace(self.t1, PN_TRACE_FRM) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org