CVSROOT: /cvs/cluster Module name: cluster Branch: RHEL5 Changes by: [EMAIL PROTECTED] 2008-02-08 14:30:10
Modified files: cmirror/src : cluster.c cluster.h functions.c functions.h local.c Log message: - stop delaying disk log writes - stop placing requests into the startup queue before initial config - added recovering_region to checkpoint data to prevent duplicate region syncing assignment. Patches: http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.15&r2=1.1.2.16 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.13&r2=1.1.2.14 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.4&r2=1.1.2.5 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.13&r2=1.1.2.14 --- cluster/cmirror/src/Attic/cluster.c 2008/02/06 23:03:05 1.1.2.15 +++ cluster/cmirror/src/Attic/cluster.c 2008/02/08 14:30:10 1.1.2.16 @@ -27,6 +27,13 @@ static SaCkptCallbacksT callbacks = { 0, 0 }; static SaVersionT version = { 'B', 1, 1 }; +#define DEBUGGING_HISTORY 20 +static char debugging[DEBUGGING_HISTORY][128]; +static int idx = 0; +static int memberz = 0; +static int doit = 0; + + struct checkpoint_data { uint32_t requester; char uuid[CPG_MAX_NAME_LENGTH]; @@ -34,6 +41,7 @@ int bitmap_size; /* in bytes */ char *sync_bits; char *clean_bits; + char *recovering_region; struct checkpoint_data *next; }; @@ -58,44 +66,18 @@ static struct list_head clog_cpg_list; /* - * flow_control - * @handle - * - * Returns: 1 if flow control needed, 0 otherwise - */ -static int flow_control(cpg_handle_t handle) -{ - cpg_flow_control_state_t flow_control_state; - cpg_error_t error; - - /* FIXME: no flow control for now (cmirror should self regulate) */ - return 0; - - error = cpg_flow_control_state_get(handle, &flow_control_state); - if (error != CPG_OK) { - LOG_ERROR("Failed to get flow control state. Reason: %d", error); - /* FIXME: Better error handling */ - return 0; - } - - return (flow_control_state == CPG_FLOW_CONTROL_ENABLED) ? 1 : 0; -} - -/* * cluster_send * @tfr * * Returns: 0 on success, -Exxx on error */ -int cluster_send(struct clog_tfr *tfr) +static int cluster_send(struct clog_tfr *tfr) { int r; int found; struct iovec iov; struct clog_cpg *entry, *tmp; - ENTER(); - list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list) if (!strncmp(entry->name.value, tfr->uuid, CPG_MAX_NAME_LENGTH)) { found = 1; @@ -104,26 +86,35 @@ if (!found) { tfr->error = -ENOENT; - EXIT(); return -ENOENT; } iov.iov_base = tfr; iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size; - while (flow_control(entry->handle)) { - /* - * FIXME: Don't need to sleep this long - * - * ... or, we could dispatch the queued messages here. - */ - LOG_PRINT("Flow control enabled. Delaying msg [%s]", - RQ_TYPE(tfr->request_type)); - sleep(1); - } + r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1); + if (r == CPG_OK) + return 0; + if (r == SA_AIS_ERR_TRY_AGAIN) + return -EAGAIN; + + LOG_ERROR("cpg_mcast_joined error: %d", r); + + tfr->error = -EBADE; + return -EBADE; +} + +int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char *function) +{ + int r; + + do { + r = cluster_send(tfr); + if (r) + LOG_ERROR("cluster_send failed at: %s:%d (%s)", + file, line, function); + } while (r == -EAGAIN); - EXIT(); - tfr->error = r = (r == CPG_OK) ? 0 : -EBADE; return r; } @@ -137,7 +128,7 @@ return r; } -static int handle_cluster_request(struct clog_tfr *tfr, int server) +static int handle_cluster_request(struct clog_tfr *tfr, int server, int printz) { int r = 0; @@ -152,27 +143,22 @@ */ if ((tfr->request_type != DM_CLOG_RESUME) || (tfr->originator == my_cluster_id)) - r = do_request(tfr); + r = do_request(tfr, server); if (server) { - if (r) - LOG_ERROR("do_request failed, unable to commit log"); - else - r = commit_log(tfr); - tfr->request_type |= DM_CLOG_RESPONSE; /* * Errors from previous functions are in the tfr struct. */ - - LOG_DBG("Sending response to %u on cluster: [%s/%llu]", - tfr->originator, - RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE), - (unsigned long long)tfr->seq); + if (printz) + LOG_DBG("[%s] Sending response to %u on cluster: [%s/%llu]", + SHORT_UUID(tfr->uuid), tfr->originator, + RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE), + (unsigned long long)tfr->seq); r = cluster_send(tfr); if (r) - LOG_ERROR("cluster_send failed"); + LOG_ERROR("cluster_send failed: %s", strerror(-r)); } EXIT(); @@ -209,6 +195,8 @@ INIT_LIST_HEAD(&l); queue_remove_all(&l, cluster_queue); LOG_ERROR("Current list:"); + if (list_empty(&l)) + LOG_ERROR(" [none]"); list_for_each_safe(p, n, &l) { list_del_init(p); t = (struct clog_tfr *)p; @@ -257,6 +245,7 @@ static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry, uint32_t cp_requester) { + int r; struct checkpoint_data *new; new = malloc(sizeof(*new)); @@ -270,7 +259,7 @@ strncpy(new->uuid, entry->name.value, entry->name.length); if (entry->valid) { - new->bitmap_size = store_bits(entry->name.value, "clean_bits", + new->bitmap_size = push_state(entry->name.value, "clean_bits", &new->clean_bits); if (new->bitmap_size <= 0) { LOG_ERROR("Failed to store clean_bits to checkpoint for node %u", @@ -279,7 +268,7 @@ return NULL; } - new->bitmap_size = store_bits(entry->name.value, + new->bitmap_size = push_state(entry->name.value, "sync_bits", &new->sync_bits); if (new->bitmap_size <= 0) { LOG_ERROR("Failed to store sync_bits to checkpoint for node %u", @@ -288,6 +277,16 @@ free(new); return NULL; } + + r = push_state(entry->name.value, "recovering_region", &new->recovering_region); + if (r <= 0) { + LOG_ERROR("Failed to store recovering_region to checkpoint for node %u", + new->requester); + free(new->sync_bits); + free(new->clean_bits); + free(new); + return NULL; + } } else { /* * We can store bitmaps yet, because the log is not @@ -309,6 +308,7 @@ */ static void free_checkpoint(struct checkpoint_data *cp) { + free(cp->recovering_region); free(cp->sync_bits); free(cp->clean_bits); free(cp); @@ -335,9 +335,9 @@ name.length = len; attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS; - attr.checkpointSize = cp->bitmap_size * 2; + attr.checkpointSize = cp->bitmap_size * 2 + strlen(cp->recovering_region) + 1; attr.retentionDuration = SA_TIME_MAX; - attr.maxSections = 3; /* don't know why we need +1 */ + attr.maxSections = 4; /* don't know why we need +1 */ attr.maxSectionSize = cp->bitmap_size; attr.maxSectionIdSize = 22; @@ -363,6 +363,7 @@ EXIT(); return -EIO; /* FIXME: better error */ } + /* * Add section for sync_bits */ @@ -408,7 +409,7 @@ } if (rv == SA_AIS_ERR_EXIST) { - LOG_ERROR("export_checkpoint: clean checkpoint section already exists"); + LOG_DBG("export_checkpoint: clean checkpoint section already exists"); EXIT(); return -EEXIST; } @@ -419,6 +420,35 @@ return -EIO; /* FIXME: better error */ } + /* + * Add section for recovering_region + */ + section_id.idLen = snprintf(buf, 32, "recovering_region"); + section_id.id = (unsigned char *)buf; + section_attr.sectionId = §ion_id; + section_attr.expirationTime = SA_TIME_END; + +rr_create_retry: + rv = saCkptSectionCreate(h, §ion_attr, cp->recovering_region, + strlen(cp->recovering_region) + 1); + if (rv == SA_AIS_ERR_TRY_AGAIN) { + LOG_ERROR("export_checkpoint: RR create retry"); + sleep(1); + goto rr_create_retry; + } + + if (rv == SA_AIS_ERR_EXIST) { + LOG_DBG("export_checkpoint: RR checkpoint section already exists"); + EXIT(); + return -EEXIST; + } + + if (rv != SA_AIS_OK) { + LOG_ERROR("export_checkpoint: RR checkpoint section creation failed"); + EXIT(); + return -EIO; /* FIXME: better error */ + } + LOG_DBG("export_checkpoint: closing checkpoint"); saCkptCheckpointClose(h); @@ -515,7 +545,7 @@ break; } saCkptSectionIterationFinalize(itr); - if (len != 2) { + if (len != 3) { LOG_ERROR("import_checkpoint: %d checkpoint sections found", len); sleep(1); goto init_retry; @@ -572,8 +602,9 @@ */ if (iov.readSize) { - if (load_bits(entry->name.value, (char *)desc.sectionId.id, bitmap, iov.readSize)) { - LOG_ERROR("Error loading bits"); + if (pull_state(entry->name.value, (char *)desc.sectionId.id, bitmap, + iov.readSize)) { + LOG_ERROR("Error loading state"); rtn = -EIO; goto fail; } @@ -645,6 +676,7 @@ int i; int r = 0; int i_am_server; + int response = 0; struct clog_tfr *tfr = msg; struct clog_tfr *startup_tfr = NULL; struct clog_cpg *match; @@ -665,7 +697,9 @@ (unsigned long long)tfr->seq); if (my_cluster_id == 0xDEAD) { - LOG_DBG("Message before init... ignoring.\n"); + LOG_DBG("[%s] Message from %u before init [%s/%llu]", + SHORT_UUID(tfr->uuid), nodeid, + RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq); return; } @@ -674,6 +708,14 @@ LOG_ERROR("Unable to find clog_cpg for cluster message"); return; } + + if (match->lowest_id == 0xDEAD) { + LOG_DBG("[%s] Message from %u before init* [%s/%llu]", + SHORT_UUID(tfr->uuid), nodeid, + RQ_TYPE(tfr->request_type), (unsigned long long) tfr->seq); + return; + } + i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0; if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) { @@ -691,7 +733,7 @@ LOG_DBG("Processing delayed request %d: %s", match->startup_queue->count, RQ_TYPE(startup_tfr->request_type)); - r = handle_cluster_request(startup_tfr, i_am_server); + r = handle_cluster_request(startup_tfr, i_am_server, 1); if (r) { LOG_ERROR("Error while processing delayed CPG message"); @@ -732,9 +774,10 @@ match->checkpoint_list = new; } - if (tfr->request_type & DM_CLOG_RESPONSE) + if (tfr->request_type & DM_CLOG_RESPONSE) { + response = 1; r = handle_cluster_response(tfr); - else { + } else { tfr->originator = nodeid; if (!match->valid) { @@ -757,15 +800,40 @@ goto out; } - r = handle_cluster_request(tfr, i_am_server); + r = handle_cluster_request(tfr, i_am_server, + ((memberz != 4) || (--doit > 0))); } out: if (r) { - LOG_ERROR("[%s] Error while processing CPG message, %s: %d", + LOG_ERROR("[%s] Error while processing CPG message, %s: %s", SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE), - r); + strerror(-r)); + LOG_ERROR("[%s] Response : %s", SHORT_UUID(tfr->uuid), + (response) ? "YES" : "NO"); + LOG_ERROR("[%s] Originator: %u", SHORT_UUID(tfr->uuid), tfr->originator); + if (response) + LOG_ERROR("[%s] Responder : %u", SHORT_UUID(tfr->uuid), nodeid); + LOG_ERROR("HISTORY::"); + + for (i = 0; i < DEBUGGING_HISTORY; i++) { + idx++; + idx = idx % DEBUGGING_HISTORY; + if (debugging[idx][0] == '\0') + continue; + LOG_ERROR("%d:%d) %s", i, idx, debugging[idx]); + } + } else if (!(tfr->request_type & DM_CLOG_RESPONSE)) { + int len; + idx++; + idx = idx % DEBUGGING_HISTORY; + len = sprintf(debugging[idx], "SEQ#=%llu, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s", + (unsigned long long)tfr->seq, SHORT_UUID(tfr->uuid), + RQ_TYPE(tfr->request_type), + tfr->originator, (response) ? "YES" : "NO"); + if (response) + sprintf(debugging[idx] + len, ", RSPR=%u", nodeid); } EXIT(); } @@ -779,10 +847,12 @@ int my_pid = getpid(); int found = 0; struct clog_cpg *match, *tmp; - uint32_t lowest; + uint32_t lowest = 0xDEAD; ENTER(); + memberz = member_list_entries; + LOG_DBG("****** CPG config callback **[%s]**", SHORT_UUID(gname->value)); @@ -821,6 +891,8 @@ goto out; } + lowest = match->lowest_id; + /* Am I leaving? */ for (i = 0; i < left_list_entries; i++) if (my_cluster_id == left_list[i].nodeid) { @@ -863,6 +935,7 @@ free(match->startup_queue); match->free_me = 1; + match->lowest_id = 0xDEAD; goto out; } @@ -871,8 +944,6 @@ if (!left_list_entries && (member_list_entries == 1) && (joined_list_entries == 1) && (member_list[0].nodeid == joined_list[0].nodeid)) { - LOG_DBG("[%s] I am the log server (and first to join)", - SHORT_UUID(match->name.value)); match->lowest_id = my_cluster_id = joined_list[0].nodeid; match->valid = 1; goto out; @@ -894,17 +965,15 @@ } } - lowest = match->lowest_id; + if (member_list_entries) + match->lowest_id = member_list[0].nodeid; + else + match->lowest_id = 0xDEAD; /* Find the lowest_id, i.e. the server */ - for (i = 0, match->lowest_id = member_list[0].nodeid; - i < member_list_entries; i++) + for (i = 0; i < member_list_entries; i++) if (match->lowest_id > member_list[i].nodeid) match->lowest_id = member_list[i].nodeid; - if (lowest != match->lowest_id) - LOG_DBG("[%s] Server is now %u", SHORT_UUID(match->name.value), - match->lowest_id); - /* * If I am part of the joining list, I do not send checkpoints * FIXME: What are the cases where multiple nodes can join? @@ -920,6 +989,21 @@ match->checkpoints_needed += i; out: + if (lowest != match->lowest_id) + LOG_DBG("[%s] Server change %u -> %u (%u %s)", + SHORT_UUID(match->name.value), + lowest, match->lowest_id, + (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid, + (joined_list_entries && (member_list_entries == 1)) ? + "is first to join" : (joined_list_entries) ? "joined" : "left"); + else + LOG_DBG("[%s] Server unchanged at %u (%u %s)", + SHORT_UUID(match->name.value), lowest, + (joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid, + (joined_list_entries) ? "joined" : "left"); + + if (joined_list_entries && (joined_list[0].nodeid == my_cluster_id)) + doit = 25; EXIT(); } @@ -1019,6 +1103,12 @@ ENTER(); + { + int i; + for(i = 0; i < DEBUGGING_HISTORY; i++) + debugging[i][0] = '\0'; + } + INIT_LIST_HEAD(&clog_cpg_list); rv = saCkptInitialize(&ckpt_handle, &callbacks, &version); --- cluster/cmirror/src/Attic/cluster.h 2008/01/23 21:21:06 1.1.2.2 +++ cluster/cmirror/src/Attic/cluster.h 2008/02/08 14:30:10 1.1.2.3 @@ -7,6 +7,7 @@ int create_cluster_cpg(char *str); int destroy_cluster_cpg(char *str); -int cluster_send(struct clog_tfr *tfr); +int cluster_send_helper(struct clog_tfr *tfr, int line, char *file, const char *function); +#define cluster_send(x) cluster_send_helper((x), __LINE__, __FILE__, __FUNCTION__) #endif /* __CLUSTER_LOG_CLUSTER_DOT_H__ */ --- cluster/cmirror/src/Attic/functions.c 2008/02/06 23:03:05 1.1.2.13 +++ cluster/cmirror/src/Attic/functions.c 2008/02/08 14:30:10 1.1.2.14 @@ -33,14 +33,6 @@ uint64_t nr_regions; }; -/* - * Used by the 'touched' variable, these macros mean: - * LOG_CHANGED - bits in the in-memory log have changed - * LOG_FLUSH - log must be committed to disk - */ -#define LOG_CHANGED 1 -#define LOG_FLUSH 2 - struct log_c { struct list_head list; @@ -103,13 +95,13 @@ static void log_set_bit(struct log_c *lc, uint32_t *bs, unsigned bit) { ext2fs_set_bit(bit, (unsigned int *) bs); - lc->touched |= LOG_CHANGED; + lc->touched = 1; } static void log_clear_bit(struct log_c *lc, uint32_t *bs, unsigned bit) { ext2fs_clear_bit(bit, (unsigned int *) bs); - lc->touched |= LOG_CHANGED; + lc->touched = 1; } /* FIXME: Why aren't count and start the same type? */ @@ -205,7 +197,7 @@ if (r < 0) { LOG_ERROR("rw_log: write failure: %s", strerror(errno)); - return -EIO; + return -EIO; /* Failed disk write */ } return 0; } @@ -216,7 +208,7 @@ LOG_ERROR("rw_log: read failure: %s", strerror(errno)); if (r != lc->disk_size) - return -EIO; + return -EIO; /* Failed disk read */ return 0; } @@ -239,7 +231,7 @@ memset(&lh, 0, sizeof(struct log_header)); if (rw_log(lc, 0)) - return -EIO; + return -EIO; /* Failed disk read */ header_from_disk(&lh, lc->disk_buffer); if (lh.magic != MIRROR_MAGIC) { @@ -285,8 +277,10 @@ bitset_size += (lc->region_count % 8) ? 1 : 0; memcpy(lc->disk_buffer + 1024, lc->sync_bits, bitset_size); - if (rw_log(lc, 1)) - return -EIO; + if (rw_log(lc, 1)) { + lc->log_dev_failed = 1; + return -EIO; /* Failed disk write */ + } return 0; } @@ -697,6 +691,7 @@ static int clog_resume(struct clog_tfr *tfr) { uint32_t i; + int commit_log = 0; struct log_c *lc = get_log(tfr->uuid); size_t size = lc->bitset_uint32_count * sizeof(uint32_t); @@ -715,6 +710,7 @@ LOG_DBG("[%s] Master resume: reading disk log", SHORT_UUID(lc->uuid)); lc->resume_override = 1000; + commit_log = 1; break; case 1: LOG_ERROR("Error:: partial bit loading (just sync_bits)"); @@ -782,11 +778,14 @@ SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count); lc->sync_search = 0; - /* - * We mark 'touched' as LOG_FLUSH so that only the master commits - * the log via 'commit_log' - */ - lc->touched = LOG_FLUSH; + if (commit_log && (lc->disk_fd >= 0)) { + tfr->error = write_log(lc); + if (tfr->error) + LOG_ERROR("Failed initial disk log write"); + else + LOG_DBG("Disk log initialized"); + lc->touched = 0; + } out: lc->state = LOG_RESUMED; lc->recovery_halted = 0; @@ -917,20 +916,34 @@ * @tfr * */ -static int clog_flush(struct clog_tfr *tfr) +static int clog_flush(struct clog_tfr *tfr, int server) { + int r = 0; struct log_c *lc = get_log(tfr->uuid); - + if (!lc) return -EINVAL; - /* - * Actual disk flush happens in 'commit_log()' - * Clear LOG_CHANGED and set LOG_FLUSH + if (!lc->touched) + return 0; + + /* + * Do the actual flushing of the log only + * if we are the server. */ - lc->touched = LOG_FLUSH; + if (server && (lc->disk_fd >= 0)) { + r = tfr->error = write_log(lc); + if (r) { + LOG_ERROR("Error writing to disk log"); + return r; + } + LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid)); + } + + lc->touched = 0; return 0; + } /* @@ -1179,14 +1192,18 @@ if (pkg->in_sync) { if (log_test_bit(lc->sync_bits, pkg->region)) { - LOG_PRINT(" Region already in-sync: %llu", - (unsigned long long)pkg->region); + LOG_ERROR("[%s] Region already in-sync: region=%llu, seq=%llu, who=%u", + SHORT_UUID(lc->uuid), + (unsigned long long)pkg->region, + (unsigned long long)tfr->seq, + tfr->originator); } else { log_set_bit(lc, lc->sync_bits, pkg->region); lc->sync_count++; - LOG_DBG("[%s] sync_count = %llu, Region %llu marked in-sync by %u", + LOG_DBG("[%s] sync_count=%llu, Region %llu marked in-sync by %u, seq=%llu", SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count, - (unsigned long long)pkg->region, tfr->originator); + (unsigned long long)pkg->region, tfr->originator, + (unsigned long long)tfr->seq); } } else if (log_test_bit(lc->sync_bits, pkg->region)) { lc->sync_count--; @@ -1249,7 +1266,7 @@ tfr->data_size = sprintf(data, "3 clustered_disk %d:%d %c", major(statbuf.st_rdev), minor(statbuf.st_rdev), - 'A'); /* FIXME: detect dead device */ + (lc->log_dev_failed) ? 'D' : 'A'); return 0; } @@ -1396,6 +1413,7 @@ /* * do_request * @tfr: the request + * @server: is this request performed by the server * * An inability to perform this function will return an error * from this function. However, an inability to successfully @@ -1403,7 +1421,7 @@ * * Returns: 0 on success, -EXXX on error */ -int do_request(struct clog_tfr *tfr) +int do_request(struct clog_tfr *tfr, int server) { int r; @@ -1442,7 +1460,7 @@ r = clog_in_sync(tfr); break; case DM_CLOG_FLUSH: - r = clog_flush(tfr); + r = clog_flush(tfr, server); break; case DM_CLOG_MARK_REGION: r = clog_mark_region(tfr); @@ -1489,52 +1507,6 @@ return 0; } -/* - * commit_log - * @tfr: commit log associated with this request - * - * This function will also set 'tfr->error' on failure - * - * Returns: 0 on success, -EXXX on error - */ -int commit_log(struct clog_tfr *tfr) -{ - int r = 0; - struct log_c *lc; - - ENTER(); - - lc = get_log(tfr->uuid); - - if (!lc) { - LOG_DBG("No log found"); - tfr->error = -EINVAL; - r = -EINVAL; - goto out; - } - - if (!(lc->touched & LOG_FLUSH)) - goto out; - - if (lc->disk_fd >= 0) { - r = tfr->error = write_log(lc); - if (r) { - LOG_ERROR("Error writing to disk log"); - return -EIO; - } - LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid)); - } - - if (lc->touched & LOG_CHANGED) - LOG_ERROR("WARNING: Log has changed during a flush operation"); - - lc->touched &= ~LOG_FLUSH; - -out: - EXIT(); - return 0; -} - static void print_bits(char *buf, int size) { #ifdef DEBUG @@ -1556,7 +1528,8 @@ #endif } -int store_bits(const char *uuid, const char *which, char **buf) +/* int store_bits(const char *uuid, const char *which, char **buf)*/ +int push_state(const char *uuid, const char *which, char **buf) { int bitset_size; struct log_c *lc; @@ -1570,8 +1543,18 @@ return -EINVAL; } + if (!strcmp(which, "recovering_region")) { + *buf = malloc(32); /* easily covers largest 64-bit int */ + if (!*buf) + return -ENOMEM; + sprintf(*buf, "%llu", (unsigned long long)lc->recovering_region); + + return 32; + } + bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits); *buf = malloc(bitset_size); + if (!*buf) { LOG_ERROR("store_bits: Unable to allocate memory"); return -ENOMEM; @@ -1590,23 +1573,33 @@ return bitset_size; } -int load_bits(const char *uuid, const char *which, char *buf, int size) +/*int load_bits(const char *uuid, const char *which, char *buf, int size)*/ +int pull_state(const char *uuid, const char *which, char *buf, int size) { int bitset_size; struct log_c *lc; if (!buf) - LOG_ERROR("load_bits: buf == NULL"); + LOG_ERROR("pull_state: buf == NULL"); lc = get_log(uuid); if (!lc) { - LOG_ERROR("load_bits: No log found for %s", uuid); + LOG_ERROR("pull_state: No log found for %s", uuid); return -EINVAL; } + if (!strncmp(which, "recovering_region", 17)) { + sscanf(buf, "%llu", (unsigned long long *)&lc->recovering_region); + LOG_DBG("[%s] recovering_region set to %llu", + SHORT_UUID(uuid), + (unsigned long long)lc->recovering_region); + return 0; + } + bitset_size = lc->bitset_uint32_count * sizeof(*lc->clean_bits); if (bitset_size != size) { - LOG_ERROR("load_bits: bad bitset_size"); + LOG_ERROR("pull_state(%s): bad bitset_size (%d vs %d)", + which, size, bitset_size); return -EINVAL; } --- cluster/cmirror/src/Attic/functions.h 2008/02/06 23:03:05 1.1.2.4 +++ cluster/cmirror/src/Attic/functions.h 2008/02/08 14:30:10 1.1.2.5 @@ -8,10 +8,16 @@ int local_resume(struct clog_tfr *tfr); int cluster_postsuspend(char *); -int do_request(struct clog_tfr *tfr); -int commit_log(struct clog_tfr *tfr); + +int do_request(struct clog_tfr *tfr, int server); + +/* int store_bits(const char *uuid, const char *which, char **buf); int load_bits(const char *uuid, const char *which, char *buf, int size); +*/ +int push_state(const char *uuid, const char *which, char **buf); +int pull_state(const char *uuid, const char *which, char *buf, int size); + int log_get_state(struct clog_tfr *tfr); int log_status(int); #endif /* __CLOG_FUNCTIONS_DOT_H__ */ --- cluster/cmirror/src/Attic/local.c 2008/02/06 23:03:05 1.1.2.13 +++ cluster/cmirror/src/Attic/local.c 2008/02/08 14:30:10 1.1.2.14 @@ -166,7 +166,8 @@ case DM_CLOG_STATUS_INFO: case DM_CLOG_STATUS_TABLE: case DM_CLOG_PRESUSPEND: - r = do_request(tfr); + /* We do not specify ourselves as server here */ + r = do_request(tfr, 0); if (r) LOG_DBG("Returning failed request to kernel [%s]", RQ_TYPE(tfr->request_type)); @@ -177,7 +178,8 @@ break; case DM_CLOG_POSTSUSPEND: - r = do_request(tfr); + /* We do not specify ourselves as server here */ + r = do_request(tfr, 0); if (r) { LOG_DBG("Returning failed request to kernel [%s]", RQ_TYPE(tfr->request_type)); @@ -212,6 +214,7 @@ LOG_ERROR("[%s] Unable to send %s to cluster: %s", SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type), strerror(-r)); + tfr->data_size = 0; tfr->error = r; kernel_send(tfr); } else {