CVSROOT:        /cvs/cluster
Module name:    cluster
Branch:         RHEL5
Changes by:     [EMAIL PROTECTED]       2008-02-08 14:30:10

Modified files:
        cmirror/src    : cluster.c cluster.h functions.c functions.h 
                         local.c 

Log message:
        - stop delaying disk log writes
        - stop placing requests into the startup queue before initial config
        - added recovering_region to checkpoint data to prevent duplicate region
        syncing assignment.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.15&r2=1.1.2.16
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.13&r2=1.1.2.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.4&r2=1.1.2.5
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.13&r2=1.1.2.14

--- cluster/cmirror/src/Attic/cluster.c 2008/02/06 23:03:05     1.1.2.15
+++ cluster/cmirror/src/Attic/cluster.c 2008/02/08 14:30:10     1.1.2.16
@@ -27,6 +27,13 @@
 static SaCkptCallbacksT callbacks = { 0, 0 };
 static SaVersionT version = { 'B', 1, 1 };
 
+#define DEBUGGING_HISTORY 20
+static char debugging[DEBUGGING_HISTORY][128];
+static int idx = 0;
+static int memberz = 0;
+static int doit = 0;
+
+
 struct checkpoint_data {
        uint32_t requester;
        char uuid[CPG_MAX_NAME_LENGTH];
@@ -34,6 +41,7 @@
        int bitmap_size; /* in bytes */
        char *sync_bits;
        char *clean_bits;
+       char *recovering_region;
        struct checkpoint_data *next;
 };     
 
@@ -58,44 +66,18 @@
 static struct list_head clog_cpg_list;
 
 /*
- * flow_control
- * @handle
- *
- * Returns: 1 if flow control needed, 0 otherwise
- */
-static int flow_control(cpg_handle_t handle)
-{
-       cpg_flow_control_state_t flow_control_state;
-       cpg_error_t error;
-       
-       /* FIXME: no flow control for now (cmirror should self regulate) */
-       return 0;
-
-       error = cpg_flow_control_state_get(handle, &flow_control_state);
-       if (error != CPG_OK) {
-               LOG_ERROR("Failed to get flow control state.  Reason: %d", 
error);
-               /* FIXME: Better error handling */
-               return 0;
-       }
-
-       return (flow_control_state == CPG_FLOW_CONTROL_ENABLED) ? 1 : 0;
-}
-
-/*
  * cluster_send
  * @tfr
  *
  * Returns: 0 on success, -Exxx on error
  */
-int cluster_send(struct clog_tfr *tfr)
+static int cluster_send(struct clog_tfr *tfr)
 {
        int r;
        int found;
        struct iovec iov;
        struct clog_cpg *entry, *tmp;
 
-       ENTER();
-       
        list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list)
                if (!strncmp(entry->name.value, tfr->uuid, 
CPG_MAX_NAME_LENGTH)) {
                        found = 1;
@@ -104,26 +86,35 @@
 
        if (!found) {
                tfr->error = -ENOENT;
-               EXIT();
                return -ENOENT;
        }
 
        iov.iov_base = tfr;
        iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size;
-       while (flow_control(entry->handle)) {
-               /*
-                * FIXME: Don't need to sleep this long
-                *
-                * ... or, we could dispatch the queued messages here.
-                */
-               LOG_PRINT("Flow control enabled.  Delaying msg [%s]",
-                         RQ_TYPE(tfr->request_type));
-               sleep(1);
-       }
+
        r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
+       if (r == CPG_OK)
+               return 0;
+       if (r == SA_AIS_ERR_TRY_AGAIN)
+               return -EAGAIN;
+
+       LOG_ERROR("cpg_mcast_joined error: %d", r);
+
+       tfr->error = -EBADE;
+       return -EBADE;
+}
+
+int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char 
*function)
+{
+       int r;
+
+       do {
+               r = cluster_send(tfr);
+               if (r)
+                       LOG_ERROR("cluster_send failed at: %s:%d (%s)",
+                                 file, line, function);
+       } while (r == -EAGAIN);
 
-       EXIT();
-       tfr->error = r = (r == CPG_OK) ? 0 : -EBADE;
        return r;
 }
 
@@ -137,7 +128,7 @@
        return r;
 }
 
-static int handle_cluster_request(struct clog_tfr *tfr, int server)
+static int handle_cluster_request(struct clog_tfr *tfr, int server, int printz)
 {
        int r = 0;
 
@@ -152,27 +143,22 @@
         */
        if ((tfr->request_type != DM_CLOG_RESUME) ||
            (tfr->originator == my_cluster_id))
-               r = do_request(tfr);
+               r = do_request(tfr, server);
 
        if (server) {
-               if (r)
-                       LOG_ERROR("do_request failed, unable to commit log");
-               else
-                       r = commit_log(tfr);
-
                tfr->request_type |= DM_CLOG_RESPONSE;
 
                /*
                 * Errors from previous functions are in the tfr struct.
                 */
-
-               LOG_DBG("Sending response to %u on cluster: [%s/%llu]",
-                       tfr->originator,
-                       RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
-                       (unsigned long long)tfr->seq);
+               if (printz)
+                       LOG_DBG("[%s] Sending response to %u on cluster: 
[%s/%llu]",
+                               SHORT_UUID(tfr->uuid), tfr->originator,
+                               RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
+                               (unsigned long long)tfr->seq);
                r = cluster_send(tfr);
                if (r)
-                       LOG_ERROR("cluster_send failed");
+                       LOG_ERROR("cluster_send failed: %s", strerror(-r));
        }
 
        EXIT();
@@ -209,6 +195,8 @@
                INIT_LIST_HEAD(&l);
                queue_remove_all(&l, cluster_queue);
                LOG_ERROR("Current list:");
+               if (list_empty(&l))
+                       LOG_ERROR("   [none]");
                list_for_each_safe(p, n, &l) {
                        list_del_init(p);
                        t = (struct clog_tfr *)p;
@@ -257,6 +245,7 @@
 static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
                                                  uint32_t cp_requester)
 {
+       int r;
        struct checkpoint_data *new;
 
        new = malloc(sizeof(*new));
@@ -270,7 +259,7 @@
        strncpy(new->uuid, entry->name.value, entry->name.length);
 
        if (entry->valid) {
-               new->bitmap_size = store_bits(entry->name.value, "clean_bits",
+               new->bitmap_size = push_state(entry->name.value, "clean_bits",
                                              &new->clean_bits);
                if (new->bitmap_size <= 0) {
                        LOG_ERROR("Failed to store clean_bits to checkpoint for 
node %u",
@@ -279,7 +268,7 @@
                        return NULL;
                }
 
-               new->bitmap_size = store_bits(entry->name.value,
+               new->bitmap_size = push_state(entry->name.value,
                                              "sync_bits", &new->sync_bits);
                if (new->bitmap_size <= 0) {
                        LOG_ERROR("Failed to store sync_bits to checkpoint for 
node %u",
@@ -288,6 +277,16 @@
                        free(new);
                        return NULL;
                }
+
+               r = push_state(entry->name.value, "recovering_region", 
&new->recovering_region);
+               if (r <= 0) {
+                       LOG_ERROR("Failed to store recovering_region to 
checkpoint for node %u",
+                                 new->requester);
+                       free(new->sync_bits);
+                       free(new->clean_bits);
+                       free(new);
+                       return NULL;
+               }
        } else {
                /*
                 * We can store bitmaps yet, because the log is not
@@ -309,6 +308,7 @@
  */
 static void free_checkpoint(struct checkpoint_data *cp)
 {
+       free(cp->recovering_region);
        free(cp->sync_bits);
        free(cp->clean_bits);
        free(cp);
@@ -335,9 +335,9 @@
        name.length = len;
 
        attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
-       attr.checkpointSize = cp->bitmap_size * 2;
+       attr.checkpointSize = cp->bitmap_size * 2 + 
strlen(cp->recovering_region) + 1;
        attr.retentionDuration = SA_TIME_MAX;
-       attr.maxSections = 3;      /* don't know why we need +1 */
+       attr.maxSections = 4;      /* don't know why we need +1 */
        attr.maxSectionSize = cp->bitmap_size;
        attr.maxSectionIdSize = 22;
 
@@ -363,6 +363,7 @@
                EXIT();
                return -EIO; /* FIXME: better error */
        }
+
        /*
         * Add section for sync_bits
         */
@@ -408,7 +409,7 @@
        }
 
        if (rv == SA_AIS_ERR_EXIST) {
-               LOG_ERROR("export_checkpoint: clean checkpoint section already 
exists");
+               LOG_DBG("export_checkpoint: clean checkpoint section already 
exists");
                EXIT();
                return -EEXIST;
        }
@@ -419,6 +420,35 @@
                return -EIO; /* FIXME: better error */
        }
 
+       /*
+        * Add section for recovering_region
+        */
+       section_id.idLen = snprintf(buf, 32, "recovering_region");
+       section_id.id = (unsigned char *)buf;
+       section_attr.sectionId = &section_id;
+       section_attr.expirationTime = SA_TIME_END;
+
+rr_create_retry:
+       rv = saCkptSectionCreate(h, &section_attr, cp->recovering_region,
+                                strlen(cp->recovering_region) + 1);
+       if (rv == SA_AIS_ERR_TRY_AGAIN) {
+               LOG_ERROR("export_checkpoint: RR create retry");
+               sleep(1);
+               goto rr_create_retry;
+       }
+
+       if (rv == SA_AIS_ERR_EXIST) {
+               LOG_DBG("export_checkpoint: RR checkpoint section already 
exists");
+               EXIT();
+               return -EEXIST;
+       }
+
+       if (rv != SA_AIS_OK) {
+               LOG_ERROR("export_checkpoint: RR checkpoint section creation 
failed");
+               EXIT();
+               return -EIO; /* FIXME: better error */
+       }
+
        LOG_DBG("export_checkpoint: closing checkpoint");
        saCkptCheckpointClose(h);
 
@@ -515,7 +545,7 @@
                        break;
        }
        saCkptSectionIterationFinalize(itr);
-       if (len != 2) {
+       if (len != 3) {
                LOG_ERROR("import_checkpoint: %d checkpoint sections found", 
len);
                sleep(1);
                goto init_retry;
@@ -572,8 +602,9 @@
                */
 
                if (iov.readSize) {
-                       if (load_bits(entry->name.value, (char 
*)desc.sectionId.id, bitmap, iov.readSize)) {
-                               LOG_ERROR("Error loading bits");
+                       if (pull_state(entry->name.value, (char 
*)desc.sectionId.id, bitmap,
+                                      iov.readSize)) {
+                               LOG_ERROR("Error loading state");
                                rtn = -EIO;
                                goto fail;
                        }
@@ -645,6 +676,7 @@
        int i;
        int r = 0;
        int i_am_server;
+       int response = 0;
        struct clog_tfr *tfr = msg;
        struct clog_tfr *startup_tfr = NULL;
        struct clog_cpg *match;
@@ -665,7 +697,9 @@
                        (unsigned long long)tfr->seq);
 
        if (my_cluster_id == 0xDEAD) {
-               LOG_DBG("Message before init... ignoring.\n");
+               LOG_DBG("[%s]  Message from %u before init [%s/%llu]",
+                       SHORT_UUID(tfr->uuid), nodeid,
+                       RQ_TYPE(tfr->request_type), (unsigned long long) 
tfr->seq);
                return;
        }
 
@@ -674,6 +708,14 @@
                LOG_ERROR("Unable to find clog_cpg for cluster message");
                return;
        }
+
+       if (match->lowest_id == 0xDEAD) {
+               LOG_DBG("[%s]  Message from %u before init* [%s/%llu]",
+                       SHORT_UUID(tfr->uuid), nodeid,
+                       RQ_TYPE(tfr->request_type), (unsigned long long) 
tfr->seq);
+               return;
+       }
+
        i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
 
        if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
@@ -691,7 +733,7 @@
                                        LOG_DBG("Processing delayed request %d: 
%s",
                                                match->startup_queue->count,
                                                
RQ_TYPE(startup_tfr->request_type));
-                                       r = handle_cluster_request(startup_tfr, 
i_am_server);
+                                       r = handle_cluster_request(startup_tfr, 
i_am_server, 1);
 
                                        if (r) {
                                                LOG_ERROR("Error while 
processing delayed CPG message");
@@ -732,9 +774,10 @@
                match->checkpoint_list = new;
        }
 
-       if (tfr->request_type & DM_CLOG_RESPONSE)
+       if (tfr->request_type & DM_CLOG_RESPONSE) {
+               response = 1;
                r = handle_cluster_response(tfr);
-       else {
+       } else {
                tfr->originator = nodeid;
 
                if (!match->valid) {
@@ -757,15 +800,40 @@
                        goto out;
                }
 
-               r = handle_cluster_request(tfr, i_am_server);
+               r = handle_cluster_request(tfr, i_am_server,
+                                          ((memberz != 4) || (--doit > 0)));
        }
 
 out:
        if (r) {
-               LOG_ERROR("[%s] Error while processing CPG message, %s: %d",
+               LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
                          SHORT_UUID(tfr->uuid),
                          RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
-                         r);
+                         strerror(-r));
+               LOG_ERROR("[%s]    Response  : %s", SHORT_UUID(tfr->uuid),
+                         (response) ? "YES" : "NO");
+               LOG_ERROR("[%s]    Originator: %u", SHORT_UUID(tfr->uuid), 
tfr->originator);
+               if (response)
+                       LOG_ERROR("[%s]    Responder : %u", 
SHORT_UUID(tfr->uuid), nodeid);
+               LOG_ERROR("HISTORY::");
+
+               for (i = 0; i < DEBUGGING_HISTORY; i++) {
+                       idx++;
+                       idx = idx % DEBUGGING_HISTORY;
+                       if (debugging[idx][0] == '\0')
+                               continue;
+                       LOG_ERROR("%d:%d) %s", i, idx, debugging[idx]);
+               }
+       } else if (!(tfr->request_type & DM_CLOG_RESPONSE)) {
+               int len;
+               idx++;
+               idx = idx % DEBUGGING_HISTORY;
+               len = sprintf(debugging[idx], "SEQ#=%llu, UUID=%s, TYPE=%s, 
ORIG=%u, RESP=%s",
+                             (unsigned long long)tfr->seq, 
SHORT_UUID(tfr->uuid),
+                             RQ_TYPE(tfr->request_type),
+                             tfr->originator, (response) ? "YES" : "NO");
+               if (response)
+                       sprintf(debugging[idx] + len, ", RSPR=%u", nodeid);
        }
        EXIT();
 }
@@ -779,10 +847,12 @@
        int my_pid = getpid();
        int found = 0;
        struct clog_cpg *match, *tmp;
-       uint32_t lowest;
+       uint32_t lowest = 0xDEAD;
 
        ENTER();
 
+       memberz = member_list_entries;
+
        LOG_DBG("****** CPG config callback **[%s]**",
                SHORT_UUID(gname->value));
 
@@ -821,6 +891,8 @@
                goto out;
        }
 
+       lowest = match->lowest_id;
+
        /* Am I leaving? */
        for (i = 0; i < left_list_entries; i++)
                if (my_cluster_id == left_list[i].nodeid) {
@@ -863,6 +935,7 @@
 
                        free(match->startup_queue);
                        match->free_me = 1;
+                       match->lowest_id = 0xDEAD;
 
                        goto out;
                }                       
@@ -871,8 +944,6 @@
        if (!left_list_entries &&
            (member_list_entries == 1) && (joined_list_entries == 1) &&
            (member_list[0].nodeid == joined_list[0].nodeid)) {
-               LOG_DBG("[%s]  I am the log server (and first to join)",
-                       SHORT_UUID(match->name.value));
                match->lowest_id = my_cluster_id = joined_list[0].nodeid;
                match->valid = 1;
                goto out;
@@ -894,17 +965,15 @@
                }
        }
 
-       lowest = match->lowest_id;
+       if (member_list_entries)
+               match->lowest_id = member_list[0].nodeid;
+       else
+               match->lowest_id = 0xDEAD;
        /* Find the lowest_id, i.e. the server */
-       for (i = 0, match->lowest_id = member_list[0].nodeid;
-            i < member_list_entries; i++)
+       for (i = 0; i < member_list_entries; i++)
                if (match->lowest_id > member_list[i].nodeid)
                        match->lowest_id = member_list[i].nodeid;
 
-       if (lowest != match->lowest_id)
-               LOG_DBG("[%s]  Server is now %u", SHORT_UUID(match->name.value),
-                       match->lowest_id);
-
        /*
         * If I am part of the joining list, I do not send checkpoints
         * FIXME: What are the cases where multiple nodes can join?
@@ -920,6 +989,21 @@
        match->checkpoints_needed += i;
 
 out:
+       if (lowest != match->lowest_id)
+               LOG_DBG("[%s]  Server change %u -> %u (%u %s)",
+                       SHORT_UUID(match->name.value),
+                       lowest, match->lowest_id,
+                       (joined_list_entries) ? joined_list[0].nodeid : 
left_list[0].nodeid,
+                       (joined_list_entries && (member_list_entries == 1)) ? 
+                       "is first to join" : (joined_list_entries) ? "joined" : 
"left");
+       else
+               LOG_DBG("[%s]  Server unchanged at %u (%u %s)",
+                       SHORT_UUID(match->name.value), lowest,
+                       (joined_list_entries) ? joined_list[0].nodeid : 
left_list[0].nodeid,
+                       (joined_list_entries) ? "joined" : "left");
+
+       if (joined_list_entries && (joined_list[0].nodeid == my_cluster_id))
+               doit = 25;
        EXIT();
 }
 
@@ -1019,6 +1103,12 @@
 
        ENTER();
 
+       {
+               int i;
+               for(i = 0; i < DEBUGGING_HISTORY; i++)
+                       debugging[i][0] = '\0';
+       }
+
        INIT_LIST_HEAD(&clog_cpg_list);
        rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
 
--- cluster/cmirror/src/Attic/cluster.h 2008/01/23 21:21:06     1.1.2.2
+++ cluster/cmirror/src/Attic/cluster.h 2008/02/08 14:30:10     1.1.2.3
@@ -7,6 +7,7 @@
 int create_cluster_cpg(char *str);
 int destroy_cluster_cpg(char *str);
 
-int cluster_send(struct clog_tfr *tfr);
+int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char 
*function);
+#define cluster_send(x) cluster_send_helper((x), __LINE__, __FILE__, 
__FUNCTION__)
 
 #endif /* __CLUSTER_LOG_CLUSTER_DOT_H__ */
--- cluster/cmirror/src/Attic/functions.c       2008/02/06 23:03:05     1.1.2.13
+++ cluster/cmirror/src/Attic/functions.c       2008/02/08 14:30:10     1.1.2.14
@@ -33,14 +33,6 @@
         uint64_t nr_regions;
 };
 
-/*
- * Used by the 'touched' variable, these macros mean:
- *   LOG_CHANGED - bits in the in-memory log have changed
- *   LOG_FLUSH   - log must be committed to disk
- */
-#define LOG_CHANGED 1
-#define LOG_FLUSH   2
-
 struct log_c {
        struct list_head list;
 
@@ -103,13 +95,13 @@
 static void log_set_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
 {
        ext2fs_set_bit(bit, (unsigned int *) bs);
-       lc->touched |= LOG_CHANGED;
+       lc->touched = 1;
 }
 
 static void log_clear_bit(struct log_c *lc, uint32_t *bs, unsigned bit)
 {
        ext2fs_clear_bit(bit, (unsigned int *) bs);
-       lc->touched |= LOG_CHANGED;
+       lc->touched = 1;
 }
 
 /* FIXME: Why aren't count and start the same type? */
@@ -205,7 +197,7 @@
                if (r < 0) {
                        LOG_ERROR("rw_log:  write failure: %s",
                                  strerror(errno));
-                       return -EIO;
+                       return -EIO; /* Failed disk write */
                }
                return 0;
        }
@@ -216,7 +208,7 @@
                LOG_ERROR("rw_log:  read failure: %s",
                          strerror(errno));
        if (r != lc->disk_size)
-               return -EIO;
+               return -EIO; /* Failed disk read */
        return 0;
 }
 
@@ -239,7 +231,7 @@
        memset(&lh, 0, sizeof(struct log_header));
 
        if (rw_log(lc, 0))
-               return -EIO;
+               return -EIO; /* Failed disk read */
 
        header_from_disk(&lh, lc->disk_buffer);
        if (lh.magic != MIRROR_MAGIC) {
@@ -285,8 +277,10 @@
        bitset_size += (lc->region_count % 8) ? 1 : 0;
        memcpy(lc->disk_buffer + 1024, lc->sync_bits, bitset_size);
 
-       if (rw_log(lc, 1))
-               return -EIO;
+       if (rw_log(lc, 1)) {
+               lc->log_dev_failed = 1;
+               return -EIO; /* Failed disk write */
+       }
        return 0;
 }
 
@@ -697,6 +691,7 @@
 static int clog_resume(struct clog_tfr *tfr)
 {
        uint32_t i;
+       int commit_log = 0;
        struct log_c *lc = get_log(tfr->uuid);
        size_t size = lc->bitset_uint32_count * sizeof(uint32_t);
 
@@ -715,6 +710,7 @@
                LOG_DBG("[%s] Master resume: reading disk log",
                          SHORT_UUID(lc->uuid));
                lc->resume_override = 1000;
+               commit_log = 1;
                break;
        case 1:
                LOG_ERROR("Error:: partial bit loading (just sync_bits)");
@@ -782,11 +778,14 @@
                SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count);
        lc->sync_search = 0;
 
-       /*
-        * We mark 'touched' as LOG_FLUSH so that only the master commits
-        * the log via 'commit_log'
-        */
-       lc->touched = LOG_FLUSH;
+       if (commit_log && (lc->disk_fd >= 0)) {
+               tfr->error = write_log(lc);
+               if (tfr->error)
+                       LOG_ERROR("Failed initial disk log write");
+               else
+                       LOG_DBG("Disk log initialized");
+               lc->touched = 0;
+       }
 out:
        lc->state = LOG_RESUMED;
        lc->recovery_halted = 0;
@@ -917,20 +916,34 @@
  * @tfr
  *
  */
-static int clog_flush(struct clog_tfr *tfr)
+static int clog_flush(struct clog_tfr *tfr, int server)
 {
+       int r = 0;
        struct log_c *lc = get_log(tfr->uuid);
-
+       
        if (!lc)
                return -EINVAL;
 
-       /* 
-        * Actual disk flush happens in 'commit_log()'
-        * Clear LOG_CHANGED and set LOG_FLUSH
+       if (!lc->touched)
+               return 0;
+
+       /*
+        * Do the actual flushing of the log only
+        * if we are the server.
         */
-       lc->touched = LOG_FLUSH;
+       if (server && (lc->disk_fd >= 0)) {
+               r = tfr->error = write_log(lc);
+               if (r) {
+                       LOG_ERROR("Error writing to disk log");
+                       return r;
+               }
+               LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
+       }
+
+       lc->touched = 0;
 
        return 0;
+
 }
 
 /*
@@ -1179,14 +1192,18 @@
 
        if (pkg->in_sync) {
                if (log_test_bit(lc->sync_bits, pkg->region)) {
-                       LOG_PRINT("  Region already in-sync: %llu",
-                                 (unsigned long long)pkg->region);
+                       LOG_ERROR("[%s]  Region already in-sync: region=%llu, 
seq=%llu, who=%u",
+                                 SHORT_UUID(lc->uuid),
+                                 (unsigned long long)pkg->region,
+                                 (unsigned long long)tfr->seq,
+                                 tfr->originator);
                } else {
                        log_set_bit(lc, lc->sync_bits, pkg->region);
                        lc->sync_count++;
-                       LOG_DBG("[%s] sync_count = %llu, Region %llu marked 
in-sync by %u",
+                       LOG_DBG("[%s] sync_count=%llu, Region %llu marked 
in-sync by %u, seq=%llu",
                                SHORT_UUID(lc->uuid), (unsigned long 
long)lc->sync_count,
-                               (unsigned long long)pkg->region, 
tfr->originator);
+                               (unsigned long long)pkg->region, 
tfr->originator,
+                               (unsigned long long)tfr->seq);
                }
        } else if (log_test_bit(lc->sync_bits, pkg->region)) {
                lc->sync_count--;
@@ -1249,7 +1266,7 @@
 
        tfr->data_size = sprintf(data, "3 clustered_disk %d:%d %c",
                                 major(statbuf.st_rdev), minor(statbuf.st_rdev),
-                                'A'); /* FIXME: detect dead device */
+                                (lc->log_dev_failed) ? 'D' : 'A');
 
        return 0;
 }
@@ -1396,6 +1413,7 @@
 /*
  * do_request
  * @tfr: the request
+ * @server: is this request performed by the server
  *
  * An inability to perform this function will return an error
  * from this function.  However, an inability to successfully
@@ -1403,7 +1421,7 @@
  *
  * Returns: 0 on success, -EXXX on error
  */
-int do_request(struct clog_tfr *tfr)
+int do_request(struct clog_tfr *tfr, int server)
 {
        int r;
 
@@ -1442,7 +1460,7 @@
                r = clog_in_sync(tfr);
                break;
        case DM_CLOG_FLUSH:
-               r = clog_flush(tfr);
+               r = clog_flush(tfr, server);
                break;
        case DM_CLOG_MARK_REGION:
                r = clog_mark_region(tfr);
@@ -1489,52 +1507,6 @@
        return 0;
 }
 
-/*
- * commit_log
- * @tfr: commit log associated with this request
- *
- * This function will also set 'tfr->error' on failure
- *
- * Returns: 0 on success, -EXXX on error
- */
-int commit_log(struct clog_tfr *tfr)
-{
-       int r = 0;
-       struct log_c *lc;
-
-       ENTER();
-
-       lc = get_log(tfr->uuid);
-       
-       if (!lc) {
-               LOG_DBG("No log found");
-               tfr->error = -EINVAL;
-               r = -EINVAL;
-               goto out;
-       }
-
-       if (!(lc->touched & LOG_FLUSH))
-               goto out;
-
-       if (lc->disk_fd >= 0) {
-               r = tfr->error = write_log(lc);
-               if (r) {
-                       LOG_ERROR("Error writing to disk log");
-                       return -EIO;
-               }
-               LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
-       }
-
-       if (lc->touched & LOG_CHANGED)
-               LOG_ERROR("WARNING:  Log has changed during a flush operation");
-
-       lc->touched &= ~LOG_FLUSH;
-
-out:
-       EXIT();
-       return 0;
-}
-
 static void print_bits(char *buf, int size)
 {
 #ifdef DEBUG
@@ -1556,7 +1528,8 @@
 #endif
 }
 
-int store_bits(const char *uuid, const char *which, char **buf)
+/* int store_bits(const char *uuid, const char *which, char **buf)*/
+int push_state(const char *uuid, const char *which, char **buf)
 {
        int bitset_size;
        struct log_c *lc;
@@ -1570,8 +1543,18 @@
                return -EINVAL;
        }
 
+       if (!strcmp(which, "recovering_region")) {
+               *buf = malloc(32); /* easily covers largest 64-bit int */
+               if (!*buf)
+                       return -ENOMEM;
+               sprintf(*buf, "%llu", (unsigned long 
long)lc->recovering_region);
+
+               return 32;
+       }
+
        bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
        *buf = malloc(bitset_size);
+
        if (!*buf) {
                LOG_ERROR("store_bits: Unable to allocate memory");
                return -ENOMEM;
@@ -1590,23 +1573,33 @@
        return bitset_size;
 }
 
-int load_bits(const char *uuid, const char *which, char *buf, int size)
+/*int load_bits(const char *uuid, const char *which, char *buf, int size)*/
+int pull_state(const char *uuid, const char *which, char *buf, int size)
 {
        int bitset_size;
        struct log_c *lc;
 
        if (!buf)
-               LOG_ERROR("load_bits: buf == NULL");
+               LOG_ERROR("pull_state: buf == NULL");
 
        lc = get_log(uuid);
        if (!lc) {
-               LOG_ERROR("load_bits: No log found for %s", uuid);
+               LOG_ERROR("pull_state: No log found for %s", uuid);
                return -EINVAL;
        }
 
+       if (!strncmp(which, "recovering_region", 17)) {
+               sscanf(buf, "%llu", (unsigned long long 
*)&lc->recovering_region);
+               LOG_DBG("[%s] recovering_region set to %llu",
+                       SHORT_UUID(uuid),
+                       (unsigned long long)lc->recovering_region);
+               return 0;
+       }
+
        bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits);
        if (bitset_size != size) {
-               LOG_ERROR("load_bits: bad bitset_size");
+               LOG_ERROR("pull_state(%s): bad bitset_size (%d vs %d)",
+                         which, size, bitset_size);
                return -EINVAL;
        }
 
--- cluster/cmirror/src/Attic/functions.h       2008/02/06 23:03:05     1.1.2.4
+++ cluster/cmirror/src/Attic/functions.h       2008/02/08 14:30:10     1.1.2.5
@@ -8,10 +8,16 @@
 
 int local_resume(struct clog_tfr *tfr);
 int cluster_postsuspend(char *);
-int do_request(struct clog_tfr *tfr);
-int commit_log(struct clog_tfr *tfr);
+
+int do_request(struct clog_tfr *tfr, int server);
+
+/*
 int store_bits(const char *uuid, const char *which, char **buf);
 int load_bits(const char *uuid, const char *which, char *buf, int size);
+*/
+int push_state(const char *uuid, const char *which, char **buf);
+int pull_state(const char *uuid, const char *which, char *buf, int size);
+
 int log_get_state(struct clog_tfr *tfr);
 int log_status(int);
 #endif /* __CLOG_FUNCTIONS_DOT_H__ */
--- cluster/cmirror/src/Attic/local.c   2008/02/06 23:03:05     1.1.2.13
+++ cluster/cmirror/src/Attic/local.c   2008/02/08 14:30:10     1.1.2.14
@@ -166,7 +166,8 @@
        case DM_CLOG_STATUS_INFO:
        case DM_CLOG_STATUS_TABLE:
        case DM_CLOG_PRESUSPEND:
-               r = do_request(tfr);
+               /* We do not specify ourselves as server here */
+               r = do_request(tfr, 0);
                if (r)
                        LOG_DBG("Returning failed request to kernel [%s]",
                                RQ_TYPE(tfr->request_type));
@@ -177,7 +178,8 @@
                        
                break;
        case DM_CLOG_POSTSUSPEND:
-               r = do_request(tfr);
+               /* We do not specify ourselves as server here */
+               r = do_request(tfr, 0);
                if (r) {
                        LOG_DBG("Returning failed request to kernel [%s]",
                                RQ_TYPE(tfr->request_type));
@@ -212,6 +214,7 @@
                        LOG_ERROR("[%s] Unable to send %s to cluster: %s",
                                  SHORT_UUID(tfr->uuid),
                                  RQ_TYPE(tfr->request_type), strerror(-r));
+                       tfr->data_size = 0;
                        tfr->error = r;
                        kernel_send(tfr);
                } else {

Reply via email to