This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push: new c1b2861 PROTON-2479: Make reading more efficient by resizing input buffer c1b2861 is described below commit c1b28618536c67fe9ba02b8b4b65e18608c3a08c Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Thu Dec 16 20:21:52 2021 -0500 PROTON-2479: Make reading more efficient by resizing input buffer This also introduces a connection driver API that allows the input buffer resizing to be done explicitly by the proactor code. --- c/include/proton/connection_driver.h | 13 +++++++++++++ c/src/core/connection_driver.c | 5 +++++ c/src/core/engine-internal.h | 2 ++ c/src/core/transport.c | 32 +++++++++++++++++--------------- c/src/proactor/epoll.c | 6 ++++++ 5 files changed, 43 insertions(+), 15 deletions(-) diff --git a/c/include/proton/connection_driver.h b/c/include/proton/connection_driver.h index b172643..bc478bd 100644 --- a/c/include/proton/connection_driver.h +++ b/c/include/proton/connection_driver.h @@ -138,6 +138,19 @@ PN_EXTERN void pn_connection_driver_destroy(pn_connection_driver_t *); PN_EXTERN pn_connection_t *pn_connection_driver_release_connection(pn_connection_driver_t *d); /** + * Try to get a read buffer with the specified size. + * + * This will try to grow the read buffer to the specified size and then it will return whatever size + * read buffer can be got. + * + * Copy data from your input byte source to buf.start, up to buf.size. + * Call pn_connection_driver_read_done() when reading is complete. + * + * buf.size==0 means reading is not possible: no buffer space or the read side is closed. + */ +PN_EXTERN pn_rwbytes_t pn_connection_driver_read_buffer_sized(pn_connection_driver_t *, size_t n); + +/** * Get the read buffer. * * Copy data from your input byte source to buf.start, up to buf.size. diff --git a/c/src/core/connection_driver.c b/c/src/core/connection_driver.c index 803917c..6d9c085 100644 --- a/c/src/core/connection_driver.c +++ b/c/src/core/connection_driver.c @@ -87,6 +87,11 @@ void pn_connection_driver_destroy(pn_connection_driver_t *d) { memset(d, 0, sizeof(*d)); } +pn_rwbytes_t pn_connection_driver_read_buffer_sized(pn_connection_driver_t *d, size_t n) { + ssize_t cap = pni_transport_grow_capacity(d->transport, n); + return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0); +} + pn_rwbytes_t pn_connection_driver_read_buffer(pn_connection_driver_t *d) { ssize_t cap = pn_transport_capacity(d->transport); return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(d->transport)) : pn_rwbytes(0, 0); diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 5c8b6c2..09ff3ec 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -376,6 +376,8 @@ void pn_link_unbound(pn_link_t* link); void pn_ep_incref(pn_endpoint_t *endpoint); void pn_ep_decref(pn_endpoint_t *endpoint); +ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n); + #if __cplusplus } #endif diff --git a/c/src/core/transport.c b/c/src/core/transport.c index b885e38..fc5d6ab 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -2916,6 +2916,22 @@ uint64_t pn_transport_get_frames_input(const pn_transport_t *transport) return 0; } +ssize_t pni_transport_grow_capacity(pn_transport_t *transport, size_t n) { + // can we expand the size of the input buffer? + size_t size = pn_max(n, transport->input_size); + if (transport->local_max_frame) { // there is a limit to buffer size + size = pn_min(size, transport->local_max_frame); + } + if (size > transport->input_size) { + char *newbuf = (char *) pni_mem_subreallocate(pn_class(transport), transport, transport->input_buf, size ); + if (newbuf) { + transport->input_buf = newbuf; + transport->input_size = size; + } + } + return transport->input_size-transport->input_pending; +} + // input ssize_t pn_transport_capacity(pn_transport_t *transport) /* <0 == done */ { @@ -2924,21 +2940,7 @@ ssize_t pn_transport_capacity(pn_transport_t *transport) /* <0 == done */ ssize_t capacity = transport->input_size - transport->input_pending; if ( capacity<=0 ) { - // can we expand the size of the input buffer? - int more = 0; - if (!transport->local_max_frame) { // no limit (ha!) - more = transport->input_size; - } else if (transport->local_max_frame > transport->input_size) { - more = pn_min(transport->input_size, transport->local_max_frame - transport->input_size); - } - if (more) { - char *newbuf = (char *) pni_mem_subreallocate(pn_class(transport), transport, transport->input_buf, transport->input_size + more ); - if (newbuf) { - transport->input_buf = newbuf; - transport->input_size += more; - capacity += more; - } - } + capacity = pni_transport_grow_capacity(transport, 2*transport->input_size); } return capacity; } diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 6ad043f..61e4dbd 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -1214,6 +1214,12 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, ssize_t n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size); if (n > 0) { pn_connection_driver_read_done(&pc->driver, n); + // If n == rbuf.size then we should enlarge the buffer and see if there is more to read + if (n==(ssize_t)rbuf.size) { + rbuf = pn_connection_driver_read_buffer_sized(&pc->driver, n*2); + n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size); + pn_connection_driver_read_done(&pc->driver, n); + } pc->output_drained = false; pconnection_tick(pc); /* check for tick changes. */ tick_required = false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org