Signed-off-by: Angus Salkeld <asalk...@redhat.com>
---
 exec/ipc_glue.c |  146 +++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 132 insertions(+), 14 deletions(-)

diff --git a/exec/ipc_glue.c b/exec/ipc_glue.c
index f34046d..9d6e8c8 100644
--- a/exec/ipc_glue.c
+++ b/exec/ipc_glue.c
@@ -80,6 +80,12 @@ struct cs_ipcs_mapper {
        char name[256];
 };
 
+struct outq_item {
+       void *msg;
+       size_t mlen;
+       struct list_head list;
+};
+
 static struct cs_ipcs_mapper ipcs_mapper[SERVICE_HANDLER_MAXIMUM_COUNT];
 
 static int32_t cs_ipcs_job_add(enum qb_loop_priority p,        void *data, 
qb_loop_job_dispatch_fn fn);
@@ -278,6 +284,10 @@ static char * pid_to_name (pid_t pid, char *out_name, 
size_t name_len)
 
 struct cs_ipcs_conn_context {
        qb_handle_t stats_handle;
+       struct list_head outq_head;
+       int32_t queuing;
+       uint32_t queued;
+       uint32_t sent;
        char data[1];
 };
 
@@ -300,6 +310,12 @@ static void 
cs_ipcs_connection_created(qb_ipcs_connection_t *c)
 
        size += ais_service[service]->private_data_size;
        context = calloc(1, size);
+
+       list_init(&context->outq_head);
+       context->queuing = QB_FALSE;
+       context->queued = 0;
+       context->sent = 0;
+
        qb_ipcs_context_set(c, context);
 
        ais_service[service]->lib_init_fn(c);
@@ -389,12 +405,25 @@ void *cs_ipcs_private_data_get(void *conn)
 
 static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c)
 {
-       struct cs_ipcs_conn_context *cnx;
+       struct cs_ipcs_conn_context *context;
+       struct list_head *list, *list_next;
+       struct outq_item *outq_item;
+
        log_printf(LOG_INFO, "%s() ", __func__);
-       cnx = qb_ipcs_context_get(c);
 
-       if (cnx) {
-               free(cnx);
+       context = qb_ipcs_context_get(c);
+       if (context) {
+               for (list = context->outq_head.next;
+                       list != &context->outq_head; list = list_next) {
+
+                       list_next = list->next;
+                       outq_item = list_entry (list, struct outq_item, list);
+
+                       list_del (list);
+                       free (outq_item->msg);
+                       free (outq_item);
+               }
+               free(context);
        }
 }
 
@@ -444,24 +473,113 @@ int cs_ipcs_response_send(void *conn, const void *msg, 
size_t mlen)
        return rc;
 }
 
-int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
+static void outq_flush (void *data)
 {
-       int32_t rc = qb_ipcs_event_send(conn, msg, mlen);
-       if (rc >= 0) {
-               return 0;
+       qb_ipcs_connection_t *conn = data;
+       struct list_head *list, *list_next;
+       struct outq_item *outq_item;
+       int32_t rc;
+       struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn);
+
+       for (list = context->outq_head.next;
+               list != &context->outq_head; list = list_next) {
+
+               list_next = list->next;
+               outq_item = list_entry (list, struct outq_item, list);
+
+               rc = qb_ipcs_event_send(conn, outq_item->msg, outq_item->mlen);
+               if (rc != outq_item->mlen) {
+                       break;
+               }
+               context->sent++;
+               context->queued--;
+
+               list_del (list);
+               free (outq_item->msg);
+               free (outq_item);
        }
-       return rc;
+       if (list_empty (&context->outq_head)) {
+               context->queuing = QB_FALSE;
+               log_printf(LOGSYS_LEVEL_INFO, "Q empty, queued:%d sent:%d.",
+                       context->queued, context->sent);
+               context->queued = 0;
+               context->sent = 0;
+               return;
+       }
+       qb_loop_job_add(corosync_poll_handle_get(), QB_LOOP_HIGH, conn, 
outq_flush);
+       if (rc < 0 && rc != -EAGAIN) {
+               log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d!", rc);
+       }
+}
+
+static void msg_send_or_queue(qb_ipcs_connection_t *conn, const struct iovec 
*iov, uint32_t iov_len)
+{
+       int32_t rc = 0;
+       int32_t i;
+       int32_t bytes_msg = 0;
+       struct outq_item *outq_item;
+       char *write_buf = 0;
+       struct cs_ipcs_conn_context *context = qb_ipcs_context_get(conn);
+
+       for (i = 0; i < iov_len; i++) {
+               bytes_msg += iov[i].iov_len;
+       }
+
+       if (!context->queuing) {
+               assert(list_empty (&context->outq_head));
+               rc = qb_ipcs_event_sendv(conn, iov, iov_len);
+               if (rc == bytes_msg) {
+                       context->sent++;
+                       return;
+               }
+               if (rc == -EAGAIN) {
+                       context->queued = 0;
+                       context->sent = 0;
+                       context->queuing = QB_TRUE;
+                       qb_loop_job_add(corosync_poll_handle_get(), 
QB_LOOP_HIGH, conn, outq_flush);
+               } else {
+                       log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d, 
expected %d!", rc, bytes_msg);
+                       return;
+               }
+       }
+       outq_item = malloc (sizeof (struct outq_item));
+       if (outq_item == NULL) {
+               qb_ipcs_disconnect(conn);
+               return;
+       }
+       outq_item->msg = malloc (bytes_msg);
+       if (outq_item->msg == NULL) {
+               free (outq_item);
+               qb_ipcs_disconnect(conn);
+               return;
+       }
+
+       write_buf = outq_item->msg;
+       for (i = 0; i < iov_len; i++) {
+               memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
+               write_buf += iov[i].iov_len;
+       }
+       outq_item->mlen = bytes_msg;
+       list_init (&outq_item->list);
+       list_add_tail (&outq_item->list, &context->outq_head);
+       context->queued++;
+}
+
+int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
+{
+       struct iovec iov;
+       iov.iov_base = (void *)msg;
+       iov.iov_len = mlen;
+       msg_send_or_queue (conn, &iov, 1);
+       return 0;
 }
 
 int cs_ipcs_dispatch_iov_send (void *conn,
        const struct iovec *iov,
        unsigned int iov_len)
 {
-       int32_t rc = qb_ipcs_event_sendv(conn, iov, iov_len);
-       if (rc >= 0) {
-               return 0;
-       }
-       return rc;
+       msg_send_or_queue(conn, iov, iov_len);
+       return 0;
 }
 
 static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
-- 
1.7.3.1

_______________________________________________
Openais mailing list
Openais@lists.linux-foundation.org
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to