Patch that adds code for synchronizing resource refcounts in the LCK
service.

The resource structure now contains a refcount_set
structure. Functions to manipulate this structure are included. The
design is similar to the CKPT service.

Index: services/ckpt.c
===================================================================
--- services/ckpt.c     (revision 1685)
+++ services/ckpt.c     (working copy)
@@ -337,7 +337,6 @@
 static void exec_ckpt_sync_checkpoint_section_endian_convert (void *msg);
 static void exec_ckpt_sync_checkpoint_refcount_endian_convert (void *msg);
 
-
 static void ckpt_sync_init (void);
 static void ckpt_sync_activate (void);
 static int  ckpt_sync_process (void);
Index: services/lck.c
===================================================================
--- services/lck.c      (revision 1685)
+++ services/lck.c      (working copy)
@@ -72,8 +72,19 @@
        MESSAGE_REQ_EXEC_LCK_LOCKPURGE = 5,
        MESSAGE_REQ_EXEC_LCK_SYNC_RESOURCE = 6,
        MESSAGE_REQ_EXEC_LCK_SYNC_RESOURCE_LOCK = 7,
+       MESSAGE_REQ_EXEC_LCK_SYNC_RESOURCE_REFCOUNT = 8,
 };
 
+struct refcount_set {
+       unsigned int refcount;
+       unsigned int nodeid;
+};
+
+typedef struct {
+       unsigned int refcount __attribute__((aligned(8)));
+       unsigned int nodeid __attribute__((aligned(8)));
+} mar_refcount_set_t;
+
 struct resource;
 
 struct resource_lock {
@@ -101,6 +112,7 @@
        struct list_head pr_pending_list_head;
        struct list_head ex_pending_list_head;
        struct resource_lock *ex_granted;
+       struct refcount_set refcount_set[PROCESSOR_COUNT_MAX];
 };
 
 struct resource_cleanup {
@@ -162,6 +174,10 @@
        void *message,
        unsigned int nodeid);
 
+static void message_handler_req_exec_lck_sync_resource_refcount (
+       void *message,
+       unsigned int nodeid);
+
 static void message_handler_req_lib_lck_resourceopen (
        void *conn,
        void *msg);
@@ -204,12 +220,20 @@
 static void exec_lck_resource_lock_endian_convert (void *msg);
 static void exec_lck_sync_resource_endian_convert (void *msg);
 static void exec_lck_sync_resource_lock_endian_convert (void *msg);
+static void exec_lck_sync_resource_refcount_endian_convert (void *msg);
 
 static void lck_sync_init (void);
 static int  lck_sync_process (void);
 static void lck_sync_activate (void);
 static void lck_sync_abort (void);
 
+static void sync_refcount_increment (
+       struct resource *resource, unsigned int nodeid);
+static void sync_refcount_decrement (
+       struct resource *resource, unsigned int nodeid);
+static void sync_refcount_calculate (
+       struct resource *resource);
+
 void resource_release (struct resource *resource);
 void resource_lock_release (struct resource_lock *resource_lock);
 
@@ -324,6 +348,10 @@
                .exec_handler_fn        = 
message_handler_req_exec_lck_sync_resource_lock,
                .exec_endian_convert_fn = 
exec_lck_sync_resource_lock_endian_convert
        },
+       {
+               .exec_handler_fn        = 
message_handler_req_exec_lck_sync_resource_refcount,
+               .exec_endian_convert_fn = 
exec_lck_sync_resource_refcount_endian_convert
+       },
 };
 
 struct corosync_service_engine lck_service_engine = {
@@ -560,6 +588,18 @@
        to_swab->timeout = swab64 (to_swab->timeout);
 }
 
+struct req_exec_lck_sync_resource_refcount {
+       mar_req_header_t header __attribute__((aligned(8)));
+       struct memb_ring_id ring_id __attribute__((aligned(8)));
+       mar_name_t resource_name __attribute__((aligned(8)));
+       mar_refcount_set_t refcount_set[PROCESSOR_COUNT_MAX] 
__attribute__((aligned(8)));
+};
+
+static void exec_lck_sync_resource_refcount_endian_convert (void *msg)
+{
+       return;
+}
+
 static void print_resource_lock_list (struct resource *resource)
 {
        struct list_head *list;
@@ -694,6 +734,57 @@
        free (resource_lock);
 }
 
+static void sync_refcount_increment (
+       struct resource *resource,
+       unsigned int nodeid)
+{
+       unsigned int i;
+
+       for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
+               if (resource->refcount_set[i].nodeid == 0) {
+                       resource->refcount_set[i].nodeid = nodeid;
+                       resource->refcount_set[i].refcount = 1;
+                       break;
+               }
+               if (resource->refcount_set[i].nodeid == nodeid) {
+                       resource->refcount_set[i].refcount += 1;
+                       break;
+               }
+       }
+}
+
+static void sync_refcount_decrement (
+       struct resource *resource,
+       unsigned int nodeid)
+{
+       unsigned int i;
+
+       for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
+               if (resource->refcount_set[i].nodeid == 0) {
+                       break;
+               }
+               if (resource->refcount_set[i].nodeid == nodeid) {
+                       resource->refcount_set[i].refcount -= 1;
+                       break;
+               }
+       }
+}
+
+static void sync_refcount_calculate (
+       struct resource *resource)
+{
+       unsigned int i;
+
+       resource->refcount = 0;
+
+       for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
+               if (resource->refcount_set[i].nodeid == 0) {
+                       break;
+               }
+       }
+       resource->refcount += resource->refcount_set[i].refcount;
+}
+
 static inline void sync_resource_free (struct list_head *head)
 {
        struct resource *resource;
@@ -777,6 +868,39 @@
        return (api->totem_mcast (&iovec, 1, TOTEM_AGREED));
 }
 
+static int sync_resource_refcount_transmit (
+       struct resource *resource)
+{
+       struct req_exec_lck_sync_resource_refcount 
req_exec_lck_sync_resource_refcount;
+       struct iovec iovec;
+       unsigned int i;
+
+       memset (&req_exec_lck_sync_resource_refcount, 0,
+               sizeof (struct req_exec_lck_sync_resource_refcount));
+
+       req_exec_lck_sync_resource_refcount.header.size =
+               sizeof (struct req_exec_lck_sync_resource_refcount);
+       req_exec_lck_sync_resource_refcount.header.id =
+               SERVICE_ID_MAKE (LCK_SERVICE, 
MESSAGE_REQ_EXEC_LCK_SYNC_RESOURCE_REFCOUNT);
+
+       memcpy (&req_exec_lck_sync_resource_refcount.ring_id,
+               &my_saved_ring_id, sizeof (struct memb_ring_id));
+       memcpy (&req_exec_lck_sync_resource_refcount.resource_name,
+               &resource->name, sizeof (mar_name_t));
+
+       for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
+               req_exec_lck_sync_resource_refcount.refcount_set[i].refcount =
+                       resource->refcount_set[i].refcount;
+               req_exec_lck_sync_resource_refcount.refcount_set[i].nodeid =
+                       resource->refcount_set[i].nodeid;
+       }
+
+       iovec.iov_base = (char *)&req_exec_lck_sync_resource_refcount;
+       iovec.iov_len = sizeof (struct req_exec_lck_sync_resource_refcount);
+
+       return (api->totem_mcast (&iovec, 1, TOTEM_AGREED));
+}
+
 static int sync_resource_iterate (void)
 {
        struct resource *resource;
@@ -814,6 +938,27 @@
        return (res);
 }
 
+static int sync_refcount_iterate (void)
+{
+       struct resource *resource;
+       struct list_head *list;
+       unsigned int res = 0;
+
+       for (list = resource_list_head.next;
+            list != &resource_list_head;
+            list = list->next) {
+
+               resource = list_entry (list, struct resource, list);
+
+               res = sync_resource_refcount_transmit (resource);
+               if (res != 0) {
+                       break;
+               }
+       }
+
+       return (res);
+}
+
 static void lck_sync_init (void)
 {
        return;
@@ -829,6 +974,12 @@
                res = sync_resource_iterate ();
        }
 
+       if (my_lowest_nodeid == api->totem_nodeid_get ()) {
+               TRACE1 ("transmit refcounts because lowest member in old 
configuration.\n");
+
+               sync_refcount_iterate ();
+       }
+
        return (0);
 }
 
@@ -1201,8 +1352,16 @@
 
                resource->refcount = 0;
                resource->ex_granted = NULL;
+
+               memset (&resource->refcount_set, 0,
+                       sizeof (struct refcount_set) * PROCESSOR_COUNT_MAX);
        }
 
+       log_printf (LOG_LEVEL_DEBUG, "RESOURCE opened is %p\n", resource);
+
+       sync_refcount_increment (resource, nodeid);
+       sync_refcount_calculate (resource);
+
        if (api->ipc_source_is_local (&req_exec_lck_resourceopen->source)) {
                resource_cleanup = malloc (sizeof (struct resource_cleanup));
                if (resource_cleanup == NULL) {
@@ -1280,8 +1439,11 @@
                goto error_exit;
        }
 
-       resource->refcount -= 1;
+       /* resource->refcount -= 1; */
 
+       sync_refcount_decrement (resource, nodeid);
+       sync_refcount_calculate (resource);
+
        if (resource->refcount == 0) {
                /* TODO */
        }
@@ -1920,6 +2082,56 @@
        }
 }
 
+static void message_handler_req_exec_lck_sync_resource_refcount (
+       void *message,
+       unsigned int nodeid)
+{
+       struct req_exec_lck_sync_resource_refcount 
*req_exec_lck_sync_resource_refcount
+               = (struct req_exec_lck_sync_resource_refcount *)message;
+       struct resource *resource;
+       unsigned int i, j;
+
+       /*
+        * Ignore messages from previous ring
+        */
+       if (memcmp (&req_exec_lck_sync_resource_refcount->ring_id,
+                   &my_saved_ring_id, sizeof (struct memb_ring_id)) != 0)
+       {
+               return;
+       }
+
+       resource = lck_resource_find (&sync_resource_list_head,
+               &req_exec_lck_sync_resource_refcount->resource_name);
+
+       assert (resource != NULL);
+
+       for (i = 0; i < PROCESSOR_COUNT_MAX; i++) {
+               if (req_exec_lck_sync_resource_refcount->refcount_set[i].nodeid 
== 0) {
+                       break;
+               }
+               for (j = 0; j < PROCESSOR_COUNT_MAX; j++) {
+                       if (resource->refcount_set[j].nodeid == 0) {
+                               resource->refcount_set[j].nodeid =
+                                       
req_exec_lck_sync_resource_refcount->refcount_set[i].nodeid;
+                               resource->refcount_set[j].refcount =
+                                       
req_exec_lck_sync_resource_refcount->refcount_set[i].refcount;
+                               break;
+                       }
+                       if 
(req_exec_lck_sync_resource_refcount->refcount_set[i].nodeid == 
resource->refcount_set[j].nodeid) {
+                               resource->refcount_set[j].refcount +=
+                                       
req_exec_lck_sync_resource_refcount->refcount_set[i].refcount;
+                               break;
+                       }
+               }
+       }
+
+       sync_refcount_calculate (resource);
+
+       /* DEBUG */
+       log_printf (LOG_LEVEL_DEBUG, "[DEBUG]: sync refcount for resource %s is 
%u\n",
+                   get_mar_name_t (&resource->name), (unsigned 
int)(resource->refcount));
+}
+
 static void message_handler_req_lib_lck_resourceopen (
        void *conn,
        void *msg)
Index: lib/lck.c
===================================================================
--- lib/lck.c   (revision 1685)
+++ lib/lck.c   (working copy)
@@ -282,7 +282,6 @@
 
        pthread_mutex_init (&lckInstance->response_mutex, NULL);
 
-
        saHandleInstancePut (&lckHandleDatabase, *lckHandle);
 
        return (SA_AIS_OK);
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to