Signed-off-by: Angus Salkeld <asalk...@redhat.com> --- exec/ipc_glue.c | 146 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 132 insertions(+), 14 deletions(-)
diff --git a/exec/ipc_glue.c b/exec/ipc_glue.c index f34046d..9d6e8c8 100644 --- a/exec/ipc_glue.c +++ b/exec/ipc_glue.c @@ -80,6 +80,12 @@ struct cs_ipcs_mapper { char name[256]; }; +struct outq_item { + void *msg; + size_t mlen; + struct list_head list; +}; + static struct cs_ipcs_mapper ipcs_mapper[SERVICE_HANDLER_MAXIMUM_COUNT]; static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn); @@ -278,6 +284,10 @@ static char * pid_to_name (pid_t pid, char *out_name, size_t name_len) struct cs_ipcs_conn_context { qb_handle_t stats_handle; + struct list_head outq_head; + int32_t queuing; + uint32_t queued; + uint32_t sent; char data[1]; }; @@ -300,6 +310,12 @@ static void cs_ipcs_connection_created(qb_ipcs_connection_t *c) size += ais_service[service]->private_data_size; context = calloc(1, size); + + list_init(&context->outq_head); + context->queuing = QB_FALSE; + context->queued = 0; + context->sent = 0; + qb_ipcs_context_set(c, context); ais_service[service]->lib_init_fn(c); @@ -389,12 +405,25 @@ void *cs_ipcs_private_data_get(void *conn) static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c) { - struct cs_ipcs_conn_context *cnx; + struct cs_ipcs_conn_context *context; + struct list_head *list, *list_next; + struct outq_item *outq_item; + log_printf(LOG_INFO, "%s() ", __func__); - cnx = qb_ipcs_context_get(c); - if (cnx) { - free(cnx); + context = qb_ipcs_context_get(c); + if (context) { + for (list = context->outq_head.next; + list != &context->outq_head; list = list_next) { + + list_next = list->next; + outq_item = list_entry (list, struct outq_item, list); + + list_del (list); + free (outq_item->msg); + free (outq_item); + } + free(context); } } @@ -444,24 +473,113 @@ int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen) return rc; } -int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen) +static void outq_flush (void *data) { - int32_t rc = qb_ipcs_event_send(conn, msg, mlen); - if (rc >= 0) { - return 0; + qb_ipcs_connection_t *conn = data; + struct list_head *list, *list_next; + struct outq_item *outq_item; + int32_t rc; + struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn); + + for (list = context->outq_head.next; + list != &context->outq_head; list = list_next) { + + list_next = list->next; + outq_item = list_entry (list, struct outq_item, list); + + rc = qb_ipcs_event_send(conn, outq_item->msg, outq_item->mlen); + if (rc != outq_item->mlen) { + break; + } + context->sent++; + context->queued--; + + list_del (list); + free (outq_item->msg); + free (outq_item); } - return rc; + if (list_empty (&context->outq_head)) { + context->queuing = QB_FALSE; + log_printf(LOGSYS_LEVEL_INFO, "Q empty, queued:%d sent:%d.", + context->queued, context->sent); + context->queued = 0; + context->sent = 0; + return; + } + qb_loop_job_add(corosync_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush); + if (rc < 0 && rc != -EAGAIN) { + log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d!", rc); + } +} + +static void msg_send_or_queue(qb_ipcs_connection_t *conn, const struct iovec *iov, uint32_t iov_len) +{ + int32_t rc = 0; + int32_t i; + int32_t bytes_msg = 0; + struct outq_item *outq_item; + char *write_buf = 0; + struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn); + + for (i = 0; i < iov_len; i++) { + bytes_msg += iov[i].iov_len; + } + + if (!context->queuing) { + assert(list_empty (&context->outq_head)); + rc = qb_ipcs_event_sendv(conn, iov, iov_len); + if (rc == bytes_msg) { + context->sent++; + return; + } + if (rc == -EAGAIN) { + context->queued = 0; + context->sent = 0; + context->queuing = QB_TRUE; + qb_loop_job_add(corosync_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush); + } else { + log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d, expected %d!", rc, bytes_msg); + return; + } + } + outq_item = malloc (sizeof (struct outq_item)); + if (outq_item == NULL) { + qb_ipcs_disconnect(conn); + return; + } + outq_item->msg = malloc (bytes_msg); + if (outq_item->msg == NULL) { + free (outq_item); + qb_ipcs_disconnect(conn); + return; + } + + write_buf = outq_item->msg; + for (i = 0; i < iov_len; i++) { + memcpy (write_buf, iov[i].iov_base, iov[i].iov_len); + write_buf += iov[i].iov_len; + } + outq_item->mlen = bytes_msg; + list_init (&outq_item->list); + list_add_tail (&outq_item->list, &context->outq_head); + context->queued++; +} + +int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen) +{ + struct iovec iov; + iov.iov_base = (void *)msg; + iov.iov_len = mlen; + msg_send_or_queue (conn, &iov, 1); + return 0; } int cs_ipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len) { - int32_t rc = qb_ipcs_event_sendv(conn, iov, iov_len); - if (rc >= 0) { - return 0; - } - return rc; + msg_send_or_queue(conn, iov, iov_len); + return 0; } static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c, -- 1.7.3.1 _______________________________________________ Openais mailing list Openais@lists.linux-foundation.org https://lists.linux-foundation.org/mailman/listinfo/openais