CVSROOT: /cvs/cluster Module name: cluster Branch: RHEL5 Changes by: [EMAIL PROTECTED] 2007-11-08 22:16:53
Modified files: cmirror/src : Makefile cluster.c functions.c local.c cmirror-kernel/src: dm-clog-tfr.c dm-clog-tfr.h Added files: cmirror/src : rbtree.c rbtree.h Log message: - only write the disk log on a 'flush' not every time a mark/clear happens - Add mark request tracking so we don't clear log bits prematurely (and to reduce number of disk writes). - Add priority recovery - regions which are being written to take first priority during recovery - introduction of CPG flow control Patches: http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/rbtree.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=NONE&r2=1.1.2.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/rbtree.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=NONE&r2=1.1.2.1 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/Makefile.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.3.2.3&r2=1.3.2.4 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/cluster.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.4&r2=1.1.2.5 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/functions.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.3&r2=1.1.2.4 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror/src/local.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.3&r2=1.1.2.4 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-clog-tfr.c.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3 http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/cmirror-kernel/src/dm-clog-tfr.h.diff?cvsroot=cluster&only_with_tag=RHEL5&r1=1.1.2.2&r2=1.1.2.3 --- cluster/cmirror/src/Attic/Makefile 2007/11/03 18:37:48 1.3.2.3 +++ cluster/cmirror/src/Attic/Makefile 2007/11/08 22:16:53 1.3.2.4 @@ -23,6 +23,8 @@ echo '-I${incdir}'; else \ echo ''; fi) +SOURCES = $(shell ls *.c) + ifneq (${TMP_INCLUDE}, ) INCLUDE += ${TMP_INCLUDE} -I. else @@ -42,7 +44,8 @@ all: ${TARGET} -clogd: link_mon.c logging.c queues.c local.c cluster.c functions.c clogd.c +#clogd: rbtree.c link_mon.c logging.c queues.c local.c cluster.c functions.c clogd.c +clogd: ${SOURCES} ${CC} ${CFLAGS} -o $@ $^ ${LDFLAGS} no_files: --- cluster/cmirror/src/Attic/cluster.c 2007/11/05 22:44:03 1.1.2.4 +++ cluster/cmirror/src/Attic/cluster.c 2007/11/08 22:16:53 1.1.2.5 @@ -58,6 +58,27 @@ static struct list_head clog_cpg_list; /* + * flow_control + * @handle + * + * Returns: 1 if flow control needed, 0 otherwise + */ +static int flow_control(cpg_handle_t handle) +{ + cpg_flow_control_state_t flow_control_state; + cpg_error_t error; + + error = cpg_flow_control_state_get(handle, &flow_control_state); + if (error != CPG_OK) { + LOG_ERROR("Failed to get flow control state. Reason: %d", error); + /* FIXME: Better error handling */ + return 0; + } + + return (flow_control_state == CPG_FLOW_CONTROL_ENABLED) ? 1 : 0; +} + +/* * cluster_send * @tfr * @@ -86,6 +107,16 @@ iov.iov_base = tfr; iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size; + while (flow_control(entry->handle)) { + /* + * FIXME: Don't need to sleep this long + * + * ... or, we could dispatch the queued messages here. + */ + LOG_PRINT("Flow control enabled. Delaying msg [%s]", + RQ_TYPE(tfr->request_type)); + sleep(1); + } r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1); EXIT(); @@ -271,7 +302,7 @@ char buf[32]; ENTER(); - LOG_PRINT("Sending checkpointed data to %u", cp->requester); + LOG_DBG("Sending checkpointed data to %u", cp->requester); len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%u", cp->requester); name.length = len; @@ -540,6 +571,9 @@ ENTER(); list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list) { r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL); + if (r != SA_AIS_OK) + LOG_ERROR("cpg_dispatch failed: %d", r); + for (cp = entry->checkpoint_list; cp;) { LOG_ERROR("Checkpoint data available for node %u", cp->requester); @@ -575,7 +609,6 @@ int i_am_server; struct clog_tfr *tfr = msg; struct clog_tfr *startup_tfr = NULL; - struct clog_tfr *cp_tfr = NULL; struct clog_cpg *match; ENTER(); @@ -603,57 +636,31 @@ i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0; if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) { - /* Redundant checkpoints ignored due to match->valid */ if (my_cluster_id == tfr->originator) { - switch (import_checkpoint(match, match->valid)) { - case 0: - if (!match->valid) { - LOG_DBG("Checkpoint data recieved. Log is now valid"); - match->valid = 1; - while ((startup_tfr = queue_remove(match->startup_queue))) { - LOG_DBG("Processing delayed request %d: %s", - match->startup_queue->count, - RQ_TYPE(startup_tfr->request_type)); - r = handle_cluster_request(startup_tfr, i_am_server); - - if (r) { - LOG_ERROR("Error while processing delayed CPG message"); - goto out; - } else { - queue_add(startup_tfr, free_queue); - } - } - } - - break; - case -EAGAIN: - LOG_PRINT("Checkpoint data empty. Requesting new checkpoint."); - - cp_tfr = queue_remove(free_queue); - if (!cp_tfr) { - /* FIXME: better error handling */ - LOG_ERROR("No clog_tfr struct available"); - goto out; - } - memset(cp_tfr, 0, sizeof(*cp_tfr)); - cp_tfr->request_type = DM_CLOG_CHECKPOINT_REQUEST; - - cp_tfr->originator = my_cluster_id; - - strncpy(cp_tfr->uuid, tfr->uuid, CPG_MAX_NAME_LENGTH); - - if ((r = cluster_send(cp_tfr))) { - /* FIXME: better error handling */ - LOG_ERROR("Failed to send checkpoint ready notice"); - queue_add(cp_tfr, free_queue); - goto out; - } - queue_add(cp_tfr, free_queue); - - break; - default: + /* Redundant checkpoints ignored if match->valid */ + if (import_checkpoint(match, match->valid)) { LOG_ERROR("Failed to import checkpoint"); /* Could we retry? */ + goto out; + } else if (!match->valid) { + LOG_DBG("Checkpoint data recieved. Log is now valid"); + match->valid = 1; + while ((startup_tfr = queue_remove(match->startup_queue))) { + LOG_DBG("Processing delayed request %d: %s", + match->startup_queue->count, + RQ_TYPE(startup_tfr->request_type)); + r = handle_cluster_request(startup_tfr, i_am_server); + + if (r) { + LOG_ERROR("Error while processing delayed CPG message"); + /* + * FIXME: If we error out here, we will never get + * another opportunity to retry these requests + */ + goto out; + } + queue_add(startup_tfr, free_queue); + } } } goto out; @@ -760,8 +767,8 @@ (member_list_entries == 1) && (joined_list_entries == 1) && (member_list[0].nodeid == joined_list[0].nodeid)) { match->lowest_id = my_cluster_id = joined_list[0].nodeid; - LOG_PRINT("I am the log server (and first to join) for %s", - match->name.value); + LOG_DBG("I am the log server (and first to join) for %s", + match->name.value); match->valid = 1; goto out; } @@ -789,7 +796,7 @@ if (match->lowest_id > member_list[i].nodeid) match->lowest_id = member_list[i].nodeid; - LOG_PRINT("Server is now %u", match->lowest_id); + LOG_DBG("Server is now %u", match->lowest_id); /* * If I am part of the joining list, I do not send checkpoints --- cluster/cmirror/src/Attic/functions.c 2007/11/05 22:44:03 1.1.2.3 +++ cluster/cmirror/src/Attic/functions.c 2007/11/08 22:16:53 1.1.2.4 @@ -15,6 +15,7 @@ #include "common.h" #include "cluster.h" #include "logging.h" +#include "rbtree.h" #define BYTE_SHIFT 3 @@ -27,6 +28,14 @@ #define MIRROR_DISK_VERSION 2 #define LOG_OFFSET 2 +/* + * Used by the 'touched' variable, these macros mean: + * LOG_CHANGED - bits in the in-memory log have changed + * LOG_FLUSH - log must be committed to disk + */ +#define LOG_CHANGED 1 +#define LOG_FLUSH 2 + struct log_header { uint32_t magic; uint32_t version; @@ -58,6 +67,9 @@ uint32_t state; /* current operational state of the log */ + struct rb_tree mark_tree; /* Tree that tracks all mark requests */ + struct recovery_request *recovery_request_list; + int disk_fd; /* -1 means no disk log */ int log_dev_failed; uint64_t disk_nr_regions; @@ -65,9 +77,20 @@ void *disk_buffer; /* aligned memory for O_DIRECT */ }; +struct mark_entry { + uint32_t nodeid; + uint64_t region; +}; + +struct recovery_request { + uint64_t region; + struct recovery_request *next; +}; + static struct list_head log_list = LIST_HEAD_INIT(log_list); static struct list_head log_pending_list = LIST_HEAD_INIT(log_pending_list); + static int log_test_bit(uint32_t *bs, unsigned bit) { return ext2fs_test_bit(bit, (unsigned int *) bs) ? 1 : 0; @@ -76,13 +99,13 @@ static void log_set_bit(struct log_c *lc, uint32_t *bs, unsigned bit) { ext2fs_set_bit(bit, (unsigned int *) bs); - lc->touched = 1; + lc->touched |= LOG_CHANGED; } static void log_clear_bit(struct log_c *lc, uint32_t *bs, unsigned bit) { ext2fs_clear_bit(bit, (unsigned int *) bs); - lc->touched = 1; + lc->touched |= LOG_CHANGED; } /* FIXME: Why aren't count and start the same type? */ @@ -262,6 +285,47 @@ return 0; } +static void *get_mark_entry_region(void *data) +{ + struct mark_entry *m = data; + + return (void *)&m->region; +} + +static int cmp_mark_entry_regions(void *a, void *b) +{ + uint64_t _a = *((uint64_t *)a); + uint64_t _b = *((uint64_t *)b); + + return (_a == _b) ? 0 : (_a < _b) ? -1 : 1; +} + +/* + * srsm_count - Same Region, Same Machine count + * @data - data held in the tree node (a mark_entry ptr) + * @adata - additional data passed in (nodeid) + * + * This function always returns 1 - allowing the RBT search + * to continuing finding additional matches. It's useful + * feature is that it counts all the tree nodes that match + * the given machine (not just the region). Results of the + * count is placed in 'srsm_count_var'. + * + * Returns: 1 + */ +static int srsm_count_var = 0; +static int srsm_count(void *data, void *adata) +{ + uint32_t nodeid = *((uint32_t *)adata); + struct mark_entry *m = data; + + if (nodeid == m->nodeid) + srsm_count_var++; + + return 1; +} + + static int find_disk_path(char *major_minor_str, char *path_rtn) { int r; @@ -394,7 +458,11 @@ lc->disk_fd = -1; lc->log_dev_failed = 0; - lc->bitset_uint32_count = region_count / (sizeof(*lc->clean_bits) << BYTE_SHIFT); + rbt_init(&lc->mark_tree, sizeof(struct mark_entry), + get_mark_entry_region, cmp_mark_entry_regions); + + lc->bitset_uint32_count = region_count / + (sizeof(*lc->clean_bits) << BYTE_SHIFT); if (region_count % (sizeof(*lc->clean_bits) << BYTE_SHIFT)) lc->bitset_uint32_count++; @@ -622,7 +690,7 @@ LOG_DBG("Non-master resume: bits pre-loaded"); lc->resume_override = 1000; lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count); - LOG_DBG("sync_count = %llu", lc->sync_count); + LOG_DBG("Initial sync_count = %llu", lc->sync_count); goto out; return 0; default: @@ -640,9 +708,9 @@ switch (tfr->error) { case 0: if (lc->disk_nr_regions < lc->region_count) - LOG_PRINT("Mirror has grown, updating log bits"); + LOG_DBG("Mirror has grown, updating log bits"); else if (lc->disk_nr_regions > lc->region_count) - LOG_PRINT("Mirror has shrunk, updating log bits"); + LOG_DBG("Mirror has shrunk, updating log bits"); break; case -EINVAL: LOG_DBG("Read log failed: not yet initialized"); @@ -669,14 +737,14 @@ /* copy clean across to sync */ memcpy(lc->sync_bits, lc->clean_bits, size); lc->sync_count = count_bits32(lc->clean_bits, lc->bitset_uint32_count); - LOG_DBG("sync_count = %llu", lc->sync_count); + LOG_DBG("Initial sync_count = %llu", lc->sync_count); lc->sync_search = 0; /* - * We mark 'touched' so that only the master commits + * We mark 'touched' as LOG_FLUSH so that only the master commits * the log via 'commit_log' */ - lc->touched = 1; + lc->touched = LOG_FLUSH; out: lc->state = LOG_RESUMED; @@ -789,6 +857,11 @@ return -EINVAL; *rtn = log_test_bit(lc->sync_bits, region); + if (*rtn) + LOG_DBG(" Region is in-sync: %llu", region); + else + LOG_DBG(" Region is not in-sync: %llu", region); + tfr->data_size = sizeof(*rtn); return 0; @@ -814,7 +887,61 @@ !log_test_bit(lc->clean_bits, lc->recovering_region)) return -EAGAIN; - /* Actual disk flush happens in 'commit_log()' */ + /* + * Actual disk flush happens in 'commit_log()' + * Clear LOG_CHANGED and set LOG_FLUSH + */ + lc->touched = LOG_FLUSH; + + return 0; +} + +/* + * mark_region + * @lc + * @region + * @who + * + * Put a mark region request in the tree for tracking. + * + * Returns: 0 on success, -EXXX on error + */ +static int mark_region(struct log_c *lc, uint64_t region, uint32_t who) +{ + struct rb_node *new; + struct mark_entry *m; + + /* + * The search will find every node in the tree that has + * the same region marked. The additional function + * passed to 'rbt_search_plus' detects if the matching + * nodes also are from the machine who is performing this + * request. + */ + srsm_count_var = 0; + if (!rbt_search_plus(&lc->mark_tree, ®ion, srsm_count, &who)) + log_clear_bit(lc, lc->clean_bits, region); + + /* Requesting region/nodeid is already in the tree */ + if (srsm_count_var) + return 0; + + /* + * Save allocation until here - if there is a failure, + * at least we have cleared the bit. + */ + new = rbt_alloc_node(&lc->mark_tree); + if (!new) { + LOG_ERROR("Unable to allocate space for mark_entry: %llu/%u", + region, who); + return -ENOMEM; + } + + m = new->rb_data; + m->nodeid = who; + m->region = region; + rbt_insert(&lc->mark_tree, new); + return 0; } @@ -827,9 +954,9 @@ * * Returns: 0 on success, -EXXX on failure */ -static void print_bits(char *buf, int size); static int clog_mark_region(struct clog_tfr *tfr) { + int r; int count; uint64_t *region; struct log_c *lc = get_log(tfr->uuid); @@ -845,14 +972,58 @@ count = tfr->data_size / sizeof(uint64_t); region = (uint64_t *)&tfr->data; - for (; count > 0; count--, region++) - log_clear_bit(lc, lc->clean_bits, *region); + for (; count > 0; count--, region++) { + r = mark_region(lc, *region, tfr->originator); + if (r) + return r; + } tfr->data_size = 0; return 0; } +static int clear_region(struct log_c *lc, uint64_t region, uint32_t who) +{ + int set_bit = 1; + struct rb_node *mark_list; + struct mark_entry *m; + + srsm_count_var = 0; + mark_list = rbt_search_plus(&lc->mark_tree, ®ion, srsm_count, &who); + if (!mark_list || !srsm_count_var) { + LOG_DBG("Clear issued on region that is not marked: %llu/%u", + region, who); + goto set_bit; + } + + /* If rb_next is set, it means more than one node has this marked */ + if (mark_list->rb_next) + set_bit = 0; + + /* Must find this machine's entry to remove it */ + for (; mark_list; mark_list = mark_list->rb_next) { + m = mark_list->rb_data; + if (m->nodeid == who) + break; + } + + if (!mark_list) { + LOG_ERROR("Bad programming: searches disagree on results"); + goto set_bit; + } + + rbt_remove(&lc->mark_tree, mark_list); + rbt_free_node(&lc->mark_tree, mark_list); + +set_bit: + /* Only clear the region if it is also in-sync */ + if (set_bit && log_test_bit(lc->sync_bits, region)) + log_set_bit(lc, lc->clean_bits, region); + + return 0; +} + /* * clog_clear_region * @tfr @@ -864,6 +1035,7 @@ */ static int clog_clear_region(struct clog_tfr *tfr) { + int r; int count; uint64_t *region; struct log_c *lc = get_log(tfr->uuid); @@ -879,8 +1051,12 @@ count = tfr->data_size / sizeof(uint64_t); region = (uint64_t *)&tfr->data; - for (; count > 0; count--, region++) - log_set_bit(lc, lc->clean_bits, *region); + for (; count > 0; count--, region++) { + r = clear_region(lc, *region, tfr->originator); + if (r) + return r; + } + tfr->data_size = 0; return 0; @@ -918,6 +1094,28 @@ return 0; } + while (lc->recovery_request_list) { + struct recovery_request *del; + + del = lc->recovery_request_list; + lc->recovery_request_list = del->next; + + pkg->r = del->region; + free(del); + + if (!log_test_bit(lc->sync_bits, pkg->r)) { + LOG_DBG("Assigning priority resync work to %u: %llu", + tfr->originator, pkg->r); +#ifdef DEBUG + LOG_DBG("Priority work remaining:"); + for (del = lc->recovery_request_list; del; del = del->next) + LOG_DBG(" %llu", del->region); +#endif + pkg->i = 1; + return 0; + } + } + pkg->r = find_next_zero_bit(lc->sync_bits, lc->region_count, lc->sync_search); @@ -973,7 +1171,6 @@ return -EINVAL; *sync_count = lc->sync_count; - LOG_DBG("sync_count = %llu", *sync_count); tfr->data_size = sizeof(*sync_count); @@ -1100,6 +1297,20 @@ return -EINVAL; *rtn = !log_test_bit(lc->sync_bits, region); + if (*rtn) { + struct recovery_request *rr; + LOG_DBG(" Region is busy recovering: %llu", region); + + /* Failure to allocated simply means we can't prioritize it */ + rr = malloc(sizeof(*rr)); + if (rr) { + rr->region = region; + rr->next = lc->recovery_request_list; + lc->recovery_request_list = rr; + } + } else + LOG_DBG(" Region is not recovering: %llu", region); + tfr->data_size = sizeof(*rtn); return 0; @@ -1228,7 +1439,7 @@ goto out; } - if (!lc->touched) + if (!(lc->touched & LOG_FLUSH)) goto out; if (lc->disk_fd >= 0) { @@ -1240,7 +1451,10 @@ LOG_DBG("Disk log written"); } - lc->touched = 0; + if (lc->touched & LOG_CHANGED) + LOG_ERROR("WARNING: Log has changed during a flush operation"); + + lc->touched &= ~LOG_FLUSH; /* FIXME: unlock */ out: @@ -1292,11 +1506,11 @@ if (!strncmp(which, "sync_bits", 9)) { memcpy(*buf, lc->sync_bits, bitset_size); - LOG_PRINT("storing sync_bits:"); + LOG_DBG("storing sync_bits:"); print_bits(*buf, bitset_size); } else if (!strncmp(which, "clean_bits", 9)) { memcpy(*buf, lc->clean_bits, bitset_size); - LOG_PRINT("storing clean_bits:"); + LOG_DBG("storing clean_bits:"); print_bits(*buf, bitset_size); } --- cluster/cmirror/src/Attic/local.c 2007/11/05 22:44:03 1.1.2.3 +++ cluster/cmirror/src/Attic/local.c 2007/11/08 22:16:53 1.1.2.4 @@ -30,12 +30,14 @@ r = recv(cn_fd, buf, sizeof(buf), 0); if (r < 0) { + LOG_ERROR("Failed to recv message from kernel"); r = -errno; goto out; } switch (((struct nlmsghdr *)buf)->nlmsg_type) { case NLMSG_ERROR: + LOG_ERROR("Unable to recv message from kernel: NLMSG_ERROR"); r = -EBADE; break; case NLMSG_DONE: @@ -77,8 +79,10 @@ * A failure to allocate space means the request is lost * The kernel must retry */ - if (!(*tfr = queue_remove(free_queue))) + if (!(*tfr = queue_remove(free_queue))) { + LOG_ERROR("Failed to get clog_tfr from free_queue"); return -ENOMEM; + } memset(*tfr, 0, sizeof(struct clog_tfr)); --- cluster/cmirror-kernel/src/dm-clog-tfr.c 2007/08/30 15:49:32 1.1.2.2 +++ cluster/cmirror-kernel/src/dm-clog-tfr.c 2007/11/08 22:16:53 1.1.2.3 @@ -81,6 +81,11 @@ } list_for_each_entry(pkg, &recieving_list, list) { + /* + DMINFO("Msg from userspace recieved [%s].", RQ_TYPE(tfr->request_type)); + DMINFO(" Seq # recieved: %llu Seq # wanted: %llu", + pkg->seq, tfr->seq); + */ if (tfr->seq == pkg->seq) { if (tfr->data_size > *(pkg->data_size)) { DMERR("Insufficient space to recieve package [%s]", --- cluster/cmirror-kernel/src/dm-clog-tfr.h 2007/11/03 18:37:48 1.1.2.2 +++ cluster/cmirror-kernel/src/dm-clog-tfr.h 2007/11/08 22:16:53 1.1.2.3 @@ -63,7 +63,7 @@ }; #ifdef __KERNEL__ -#define DM_MSG_PREFIX "clulog" +#define DM_MSG_PREFIX "dm-log-clustered" int dm_clog_tfr_init(void); void dm_clog_tfr_exit(void);