Included patch should fix https://bugzilla.redhat.com/show_bug.cgi?id=506255 .
David, I hope it will fix problem for you. It's based on simple idea of adding node startup timestamp at the end of cpg_join (and joinlist) calls. If timestamp is larger then old timestamp we know, node was restarted and we didn't notices -> deliver leave event and then join event. If timestamp is same (or in special cases lower) -> new cpg app joined -> send only join event. Of course, patch isn't so simple. Cpg_join messages are always send as larger messages with timestamp (btw. timestamp is 64-bit value, because I expect l(o^64)ng life of corosync ;) ). On delivery, we test, if message is larger then standard message. If it is -> we have ts -> use it. Bigger problem was joinlist, because it's array, ... you will see in source. Solution is, to send special entry, with pid 0 (shouldn't ever happened to process, to have pid 0), and timestamp encoded in name (ugly, but looks like working). Please comment, if you can. Regards, Honza
diff --git a/trunk/services/cpg.c b/trunk/services/cpg.c index 5c93586..c052db0 100644 --- a/trunk/services/cpg.c +++ b/trunk/services/cpg.c @@ -70,6 +70,8 @@ LOGSYS_DECLARE_SUBSYS ("CPG"); #define GROUP_HASH_SIZE 32 +#define GROUP_SPECIAL_PID 0 + enum cpg_message_req_types { MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0, MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1, @@ -135,6 +137,7 @@ struct process_info { unsigned int nodeid; uint32_t pid; mar_cpg_name_t group; + uint64_t timestamp; struct list_head list; /* on the group_info members list */ }; DECLARE_LIST_INIT(process_info_list_head); @@ -146,6 +149,9 @@ struct join_list_entry { static struct corosync_api_v1 *api = NULL; +static uint64_t timestamp; +static char special_pid_check[9] = {0xff, 0xee, 0xdd, 'c', 'h', 'e', 'c', 'k', 0x00}; + /* * Service Interfaces required by service_message_handler struct */ @@ -341,6 +347,14 @@ struct req_exec_cpg_procjoin { mar_uint32_t reason __attribute__((aligned(8))); }; +struct req_exec_cpg_procjoin_with_ts { + coroipc_request_header_t header __attribute__((aligned(8))); + mar_cpg_name_t group_name __attribute__((aligned(8))); + mar_uint32_t pid __attribute__((aligned(8))); + mar_uint32_t reason __attribute__((aligned(8))); + mar_uint64_t timestamp __attribute__((aligned(8))); +}; + struct req_exec_cpg_mcast { coroipc_request_header_t header __attribute__((aligned(8))); mar_cpg_name_t group_name __attribute__((aligned(8))); @@ -504,6 +518,9 @@ static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api) logsys_subsys_init(); #endif api = corosync_api; + + timestamp = (uint64_t) time(NULL); + return (0); } @@ -524,7 +541,7 @@ static int cpg_lib_exit_fn (void *conn) static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason) { - struct req_exec_cpg_procjoin req_exec_cpg_procjoin; + struct req_exec_cpg_procjoin_with_ts req_exec_cpg_procjoin; struct iovec req_exec_cpg_iovec; int result; @@ -532,6 +549,8 @@ static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *grou req_exec_cpg_procjoin.pid = pid; req_exec_cpg_procjoin.reason = reason; + req_exec_cpg_procjoin.timestamp = timestamp; + req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin); req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn); @@ -601,6 +620,12 @@ static void exec_cpg_procjoin_endian_convert (void *msg) req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid); swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name); req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason); + + if (req_exec_cpg_procjoin->header.size > sizeof (struct req_exec_cpg_procjoin)) { + /* Message is full message with ts */ + ((struct req_exec_cpg_procjoin_with_ts *) msg)->timestamp = + swab64(((struct req_exec_cpg_procjoin_with_ts *) msg)->timestamp); + } } static void exec_cpg_joinlist_endian_convert (void *msg_v) @@ -614,6 +639,27 @@ static void exec_cpg_joinlist_endian_convert (void *msg_v) while ((const char*)jle < msg + res->size) { jle->pid = swab32(jle->pid); swab_mar_cpg_name_t (&jle->group_name); + + /* Test, if this pid is special pid */ + if (jle->pid == GROUP_SPECIAL_PID && + memcmp (jle->group_name.value, special_pid_check, sizeof (special_pid_check)) == 0) { + + uint32_t ainfo_size; + uint64_t ntimestamp; + + ainfo_size = *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check))); + ainfo_size = swab32 (ainfo_size); + *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check))) = ainfo_size; + + if (ainfo_size >= sizeof (timestamp)) { + /* We have timestamp */ + ntimestamp = *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) + + sizeof (uint32_t))); + ntimestamp = swab64 (ntimestamp); + *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) + + sizeof (uint32_t))) = ntimestamp; + } + } jle++; } } @@ -662,6 +708,7 @@ static void do_proc_join( const mar_cpg_name_t *name, uint32_t pid, unsigned int nodeid, + uint64_t ntimestamp, int reason) { struct process_info *pi; @@ -680,6 +727,7 @@ static void do_proc_join( } pi->nodeid = nodeid; pi->pid = pid; + pi->timestamp = ntimestamp; memcpy(&pi->group, name, sizeof(*name)); list_init(&pi->list); @@ -750,11 +798,56 @@ static void message_handler_req_exec_cpg_procjoin ( unsigned int nodeid) { const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message; + uint64_t ntimestamp; + struct list_head *iter; + int node_failed = 0; + mar_cpg_address_t left_list[1]; log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid); + if (req_exec_cpg_procjoin->header.size > sizeof (struct req_exec_cpg_procjoin)) { + /* Message with timestamp */ + ntimestamp = ((struct req_exec_cpg_procjoin_with_ts *)message)->timestamp; + } else { + ntimestamp = ~0; + } + + /* Test if node didn't failed before */ + for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) { + struct process_info *pi = list_entry(iter, struct process_info, list); + + if (pi->nodeid == nodeid && ntimestamp > pi->timestamp) { + /* Node failed*/ + node_failed = 1; + break; + } + } + + if (node_failed) { + /* Fake leave message */ + log_printf(LOGSYS_LEVEL_DEBUG, "node %d failed. Delivering leave message\n", nodeid); + + for (iter = process_info_list_head.next; iter != &process_info_list_head; ) { + struct process_info *pi = list_entry(iter, struct process_info, list); + iter = iter->next; + + if (pi->nodeid == nodeid) { + left_list[0].nodeid = pi->nodeid; + left_list[0].pid = pi->pid; + left_list[0].reason = CONFCHG_CPG_REASON_NODEDOWN; + + notify_lib_joinlist(&pi->group, NULL, + 0, NULL, + 1, left_list, + MESSAGE_RES_CPG_CONFCHG_CALLBACK); + list_del (&pi->list); + free (pi); + } + } + } + do_proc_join (&req_exec_cpg_procjoin->group_name, - req_exec_cpg_procjoin->pid, nodeid, + req_exec_cpg_procjoin->pid, nodeid, ntimestamp, CONFCHG_CPG_REASON_JOIN); } @@ -799,6 +892,7 @@ static void message_handler_req_exec_cpg_joinlist ( const char *message = message_v; const coroipc_response_header_t *res = (const coroipc_response_header_t *)message; const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t)); + uint64_t ntimestamp; log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node %x\n", nodeid); @@ -808,9 +902,36 @@ static void message_handler_req_exec_cpg_joinlist ( return; } + jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t)); + + ntimestamp = ~0; + + /* We first try to found, if we have timestamp or not */ + while ((const char*)jle < message + res->size) { + if (jle->pid == GROUP_SPECIAL_PID && + memcmp (jle->group_name.value, special_pid_check, sizeof (special_pid_check)) == 0) { + + /* It looks like we have special pid -> timestamp*/ + uint32_t ainfo_size; + + ainfo_size = *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check))); + + if (ainfo_size >= sizeof (timestamp)) { + /* We have timestamp */ + ntimestamp = *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) + + sizeof (uint32_t))); + + log_printf(LOGSYS_LEVEL_DEBUG, "joinlist with timestamp %lld\n", ntimestamp); + } + } + jle++; + } + + jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t)); + while ((const char*)jle < message + res->size) { do_proc_join (&jle->group_name, jle->pid, nodeid, - CONFCHG_CPG_REASON_NODEUP); + ntimestamp, CONFCHG_CPG_REASON_NODEUP); jle++; } } @@ -873,7 +994,7 @@ static int cpg_exec_send_joinlist(void) if (!count) return 0; - buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * count); + buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * (count + 1)); if (!buf) { log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer"); return -1; @@ -892,8 +1013,15 @@ static int cpg_exec_send_joinlist(void) } } + /* Time to add special pid with timestamp */ + jle->pid = GROUP_SPECIAL_PID; + jle->group_name.length = sizeof (special_pid_check); + memcpy(jle->group_name.value, special_pid_check, sizeof (special_pid_check)); + *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check))) = sizeof (timestamp); + *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) + sizeof (uint32_t))) = timestamp; + res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST); - res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * count; + res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * (count + 1); req_exec_cpg_iovec.iov_base = buf; req_exec_cpg_iovec.iov_len = res->size;
_______________________________________________ Openais mailing list Openais@lists.linux-foundation.org https://lists.linux-foundation.org/mailman/listinfo/openais