It's used when trans_ack is changed so after change, we have clean room
and/or previous state.

Signed-off-by: Jan Friesse <[email protected]>
---
 exec/totempg.c |  175 +++++++++++++++++++++++++++++++++----------------------
 1 files changed, 105 insertions(+), 70 deletions(-)

diff --git a/exec/totempg.c b/exec/totempg.c
index 0649cab..a6319ca 100644
--- a/exec/totempg.c
+++ b/exec/totempg.c
@@ -152,16 +152,6 @@ struct totempg_mcast {
 #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
        sizeof (struct totempg_mcast))
 
-/*
- * Local variables used for packing small messages
- */
-static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
-
-static int mcast_packed_msg_count = 0;
-
-static int totempg_reserved = 1;
-
-static unsigned int totempg_size_limit;
 
 /*
  * Function and data used to log messages
@@ -211,24 +201,41 @@ DECLARE_LIST_INIT(assembly_list_free_trans);
 DECLARE_LIST_INIT(assembly_list_free);
 
 /*
- * Staging buffer for packed messages.  Messages are staged in this buffer
- * before sending.  Multiple messages may fit which cuts down on the
- * number of mcasts sent.  If a message doesn't completely fit, then
- * the mcast header has a fragment bit set that says that there are more
- * data to follow.  fragment_size is an index into the buffer.  It indicates
- * the size of message data and where to place new message data.
- * fragment_contuation indicates whether the first packed message in
- * the buffer is a continuation of a previously packed fragment.
+ * Structure for storing totem_pg contexts so they can be switched
  */
-static unsigned char *fragmentation_data;
+struct totempg_context {
+       /*
+        * Staging buffer for packed messages.  Messages are staged in this 
buffer
+        * before sending.  Multiple messages may fit which cuts down on the
+        * number of mcasts sent.  If a message doesn't completely fit, then
+        * the mcast header has a fragment bit set that says that there are more
+        * data to follow.  fragment_size is an index into the buffer.  It 
indicates
+        * the size of message data and where to place new message data.
+        * fragment_contuation indicates whether the first packed message in
+        * the buffer is a continuation of a previously packed fragment.
+        */
+       unsigned char *fragmentation_data;
 
-static int fragment_size = 0;
+       int fragment_size;
 
-static int fragment_continuation = 0;
+       int fragment_continuation;
 
-static int totempg_waiting_transack = 0;
+       unsigned char next_fragment;
 
-static unsigned int totempg_max_handle = 0;
+       /*
+        * Local variables used for packing small messages
+        */
+       unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
+
+       int mcast_packed_msg_count;
+
+       int totempg_reserved;
+};
+
+#define TOTEMPG_NO_CONTEXTS    2
+
+static struct totempg_context totempg_contexts_array[TOTEMPG_NO_CONTEXTS];
+static struct totempg_context *totempg_active_context;
 
 struct totempg_group_instance {
        void (*deliver_fn) (
@@ -251,7 +258,9 @@ struct totempg_group_instance {
 
 DECLARE_HDB_DATABASE (totempg_groups_instance_database,NULL);
 
-static unsigned char next_fragment = 1;
+static unsigned int totempg_max_handle = 0;
+static int totempg_waiting_transack = 0;
+static unsigned int totempg_size_limit = 0;
 
 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
 
@@ -273,12 +282,23 @@ static int msg_count_send_ok (int msg_count);
 
 static int byte_count_send_ok (int byte_count);
 
+static void totempg_set_context(int context_no)
+{
+       totempg_active_context = &totempg_contexts_array[context_no];
+}
+
 static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
 {
        log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", 
waiting_trans_ack);
        totempg_waiting_transack = waiting_trans_ack;
+       if (!waiting_trans_ack) {
+               totempg_set_context(0);
+       } else {
+               totempg_set_context(1);
+       }
 }
 
+
 static struct assembly *assembly_ref (unsigned int nodeid)
 {
        struct assembly *assembly;
@@ -746,9 +766,10 @@ int callback_token_received_fn (enum 
totem_callback_token_type type,
        struct totempg_mcast mcast;
        struct iovec iovecs[3];
        int res;
+       struct totempg_context *con = totempg_active_context;
 
        pthread_mutex_lock (&mcast_msg_mutex);
-       if (mcast_packed_msg_count == 0) {
+       if (con->mcast_packed_msg_count == 0) {
                pthread_mutex_unlock (&mcast_msg_mutex);
                return (0);
        }
@@ -763,21 +784,21 @@ int callback_token_received_fn (enum 
totem_callback_token_type type,
         * Was the first message in this buffer a continuation of a
         * fragmented message?
         */
-       mcast.continuation = fragment_continuation;
-       fragment_continuation = 0;
+       mcast.continuation = con->fragment_continuation;
+       con->fragment_continuation = 0;
 
-       mcast.msg_count = mcast_packed_msg_count;
+       mcast.msg_count = con->mcast_packed_msg_count;
 
        iovecs[0].iov_base = (void *)&mcast;
        iovecs[0].iov_len = sizeof (struct totempg_mcast);
-       iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
-       iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
-       iovecs[2].iov_base = (void *)&fragmentation_data[0];
-       iovecs[2].iov_len = fragment_size;
+       iovecs[1].iov_base = (void *)con->mcast_packed_msg_lens;
+       iovecs[1].iov_len = con->mcast_packed_msg_count * sizeof (unsigned 
short);
+       iovecs[2].iov_base = (void *)&con->fragmentation_data[0];
+       iovecs[2].iov_len = con->fragment_size;
        res = totemmrp_mcast (iovecs, 3, 0);
 
-       mcast_packed_msg_count = 0;
-       fragment_size = 0;
+       con->mcast_packed_msg_count = 0;
+       con->fragment_size = 0;
 
        pthread_mutex_unlock (&mcast_msg_mutex);
        return (0);
@@ -791,6 +812,8 @@ int totempg_initialize (
        struct totem_config *totem_config)
 {
        int res;
+       int i;
+       struct totempg_context *con;
 
        totempg_totem_config = totem_config;
        totempg_log_level_security = 
totem_config->totem_logging_configuration.log_level_security;
@@ -801,9 +824,16 @@ int totempg_initialize (
        totempg_log_printf = 
totem_config->totem_logging_configuration.log_printf;
        totempg_subsys_id = 
totem_config->totem_logging_configuration.log_subsys_id;
 
-       fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
-       if (fragmentation_data == 0) {
-               return (-1);
+       for (i = 0; i < TOTEMPG_NO_CONTEXTS; i++) {
+               con = &totempg_contexts_array[i];
+               memset(con, 0, sizeof(*con));
+
+               con->fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
+               if (con->fragmentation_data == 0) {
+                       return (-1);
+               }
+               con->totempg_reserved = 1;
+               con->next_fragment = 1;
        }
 
        totemsrp_net_mtu_adjust (totem_config);
@@ -855,6 +885,7 @@ static int mcast_msg (
        int copy_len = 0;
        int copy_base = 0;
        int total_size = 0;
+       struct totempg_context *con = totempg_active_context;
 
        pthread_mutex_lock (&mcast_msg_mutex);
        totemmrp_event_signal (TOTEM_EVENT_NEW_MSG, 1);
@@ -872,9 +903,9 @@ static int mcast_msg (
        iov_len = dest;
 
        max_packet_size = TOTEMPG_PACKET_SIZE -
-               (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
+               (sizeof (unsigned short) * (con->mcast_packed_msg_count + 1));
 
-       mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
+       con->mcast_packed_msg_lens[con->mcast_packed_msg_count] = 0;
 
        /*
         * Check if we would overwrite new message queue
@@ -884,7 +915,7 @@ static int mcast_msg (
        }
 
        if (byte_count_send_ok (total_size + sizeof(unsigned short) *
-               (mcast_packed_msg_count)) == 0) {
+               (con->mcast_packed_msg_count)) == 0) {
 
                pthread_mutex_unlock (&mcast_msg_mutex);
                return(-1);
@@ -893,7 +924,7 @@ static int mcast_msg (
        mcast.header.version = 0;
        for (i = 0; i < iov_len; ) {
                mcast.fragmented = 0;
-               mcast.continuation = fragment_continuation;
+               mcast.continuation = con->fragment_continuation;
                copy_len = iovec[i].iov_len - copy_base;
 
                /*
@@ -902,14 +933,14 @@ static int mcast_msg (
                 * fragment_buffer on exit so that max_packet_size + 
fragment_size
                 * doesn't exceed the size of the fragment_buffer on the next 
call.
                 */
-               if ((copy_len + fragment_size) <
+               if ((copy_len + con->fragment_size) <
                        (max_packet_size - sizeof (unsigned short))) {
 
-                       memcpy (&fragmentation_data[fragment_size],
+                       memcpy (&con->fragmentation_data[con->fragment_size],
                                (char *)iovec[i].iov_base + copy_base, 
copy_len);
-                       fragment_size += copy_len;
-                       mcast_packed_msg_lens[mcast_packed_msg_count] += 
copy_len;
-                       next_fragment = 1;
+                       con->fragment_size += copy_len;
+                       con->mcast_packed_msg_lens[con->mcast_packed_msg_count] 
+= copy_len;
+                       con->next_fragment = 1;
                        copy_len = 0;
                        copy_base = 0;
                        i++;
@@ -921,18 +952,18 @@ static int mcast_msg (
                } else {
                        unsigned char *data_ptr;
 
-                       copy_len = min(copy_len, max_packet_size - 
fragment_size);
+                       copy_len = min(copy_len, max_packet_size - 
con->fragment_size);
                        if( copy_len == max_packet_size )
                                data_ptr = (unsigned char *)iovec[i].iov_base + 
copy_base;
                        else {
-                               data_ptr = fragmentation_data;
-                               memcpy (&fragmentation_data[fragment_size],
+                               data_ptr = con->fragmentation_data;
+                               memcpy 
(&con->fragmentation_data[con->fragment_size],
                                (unsigned char *)iovec[i].iov_base + copy_base, 
copy_len);
                        }
 
-                       memcpy (&fragmentation_data[fragment_size],
+                       memcpy (&con->fragmentation_data[con->fragment_size],
                                (unsigned char *)iovec[i].iov_base + copy_base, 
copy_len);
-                       mcast_packed_msg_lens[mcast_packed_msg_count] += 
copy_len;
+                       con->mcast_packed_msg_lens[con->mcast_packed_msg_count] 
+= copy_len;
 
                        /*
                         * if we're not on the last iovec or the iovec is too 
large to
@@ -941,25 +972,25 @@ static int mcast_msg (
                         */
                        if ((i < (iov_len - 1)) ||
                                        ((copy_base + copy_len) < 
iovec[i].iov_len)) {
-                               if (!next_fragment) {
-                                       next_fragment++;
+                               if (!con->next_fragment) {
+                                       con->next_fragment++;
                                }
-                               fragment_continuation = next_fragment;
-                               mcast.fragmented = next_fragment++;
-                               assert(fragment_continuation != 0);
+                               con->fragment_continuation = con->next_fragment;
+                               mcast.fragmented = con->next_fragment++;
+                               assert(con->fragment_continuation != 0);
                                assert(mcast.fragmented != 0);
                        } else {
-                               fragment_continuation = 0;
+                               con->fragment_continuation = 0;
                        }
 
                        /*
                         * assemble the message and send it
                         */
-                       mcast.msg_count = ++mcast_packed_msg_count;
+                       mcast.msg_count = ++con->mcast_packed_msg_count;
                        iovecs[0].iov_base = (void *)&mcast;
                        iovecs[0].iov_len = sizeof(struct totempg_mcast);
-                       iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
-                       iovecs[1].iov_len = mcast_packed_msg_count *
+                       iovecs[1].iov_base = (void *)con->mcast_packed_msg_lens;
+                       iovecs[1].iov_len = con->mcast_packed_msg_count *
                                sizeof(unsigned short);
                        iovecs[2].iov_base = (void *)data_ptr;
                        iovecs[2].iov_len = max_packet_size;
@@ -972,9 +1003,9 @@ static int mcast_msg (
                        /*
                         * Recalculate counts and indexes for the next.
                         */
-                       mcast_packed_msg_lens[0] = 0;
-                       mcast_packed_msg_count = 0;
-                       fragment_size = 0;
+                       con->mcast_packed_msg_lens[0] = 0;
+                       con->mcast_packed_msg_count = 0;
+                       con->fragment_size = 0;
                        max_packet_size = TOTEMPG_PACKET_SIZE - 
(sizeof(unsigned short));
 
                        /*
@@ -999,8 +1030,8 @@ static int mcast_msg (
         * the last buffer just fit into the fragmentation_data buffer
         * and we were at the last iovec.
         */
-       if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
-                       mcast_packed_msg_count++;
+       if (con->mcast_packed_msg_lens[con->mcast_packed_msg_count]) {
+                       con->mcast_packed_msg_count++;
        }
 
 error_exit:
@@ -1015,11 +1046,12 @@ static int msg_count_send_ok (
        int msg_count)
 {
        int avail = 0;
+       struct totempg_context *con = totempg_active_context;
 
        avail = totemmrp_avail ();
        totempg_stats.msg_queue_avail = avail;
 
-       return ((avail - totempg_reserved) > msg_count);
+       return ((avail - con->totempg_reserved) > msg_count);
 }
 
 static int byte_count_send_ok (
@@ -1039,10 +1071,11 @@ static int send_reserve (
        int msg_size)
 {
        unsigned int msg_count = 0;
+       struct totempg_context *con = totempg_active_context;
 
        msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct 
totempg_mcast) - 16)) + 1;
-       totempg_reserved += msg_count;
-       totempg_stats.msg_reserved = totempg_reserved;
+       con->totempg_reserved += msg_count;
+       totempg_stats.msg_reserved = con->totempg_reserved;
 
        return (msg_count);
 }
@@ -1050,8 +1083,10 @@ static int send_reserve (
 static void send_release (
        int msg_count)
 {
-       totempg_reserved -= msg_count;
-       totempg_stats.msg_reserved = totempg_reserved;
+       struct totempg_context *con = totempg_active_context;
+
+       con->totempg_reserved -= msg_count;
+       totempg_stats.msg_reserved = con->totempg_reserved;
 }
 
 int totempg_callback_token_create (
-- 
1.7.1

_______________________________________________
discuss mailing list
[email protected]
http://lists.corosync.org/mailman/listinfo/discuss

Reply via email to