Included patch should fix
https://bugzilla.redhat.com/show_bug.cgi?id=506255 .

David, I hope it will fix problem for you.

It's based on simple idea of adding node startup timestamp at the end of
cpg_join (and joinlist) calls. If timestamp is larger then old timestamp
we know, node was restarted and we didn't notices -> deliver leave event
and then join event. If timestamp is same (or in special cases lower) ->
new cpg app joined -> send only join event.

Of course, patch isn't so simple. Cpg_join messages are always send as
larger messages with timestamp (btw. timestamp is 64-bit value, because
I expect l(o^64)ng life of corosync ;) ). On delivery, we test, if
message is larger then standard message. If it is -> we have ts -> use it.

Bigger problem was joinlist, because it's array, ... you will see in
source. Solution is, to send special entry, with pid 0 (shouldn't ever
happened to process, to have pid 0), and timestamp encoded in name
(ugly, but looks like working).

Please comment, if you can.

Regards,
  Honza
diff --git a/trunk/services/cpg.c b/trunk/services/cpg.c
index 5c93586..c052db0 100644
--- a/trunk/services/cpg.c
+++ b/trunk/services/cpg.c
@@ -70,6 +70,8 @@ LOGSYS_DECLARE_SUBSYS ("CPG");
 
 #define GROUP_HASH_SIZE 32
 
+#define GROUP_SPECIAL_PID 0
+
 enum cpg_message_req_types {
 	MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
 	MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
@@ -135,6 +137,7 @@ struct process_info {
 	unsigned int nodeid;
 	uint32_t pid;
 	mar_cpg_name_t group;
+	uint64_t timestamp;
 	struct list_head list; /* on the group_info members list */
 };
 DECLARE_LIST_INIT(process_info_list_head);
@@ -146,6 +149,9 @@ struct join_list_entry {
 
 static struct corosync_api_v1 *api = NULL;
 
+static uint64_t timestamp;
+static char special_pid_check[9] = {0xff, 0xee, 0xdd, 'c', 'h', 'e', 'c', 'k', 0x00};
+
 /*
  * Service Interfaces required by service_message_handler struct
  */
@@ -341,6 +347,14 @@ struct req_exec_cpg_procjoin {
 	mar_uint32_t reason __attribute__((aligned(8)));
 };
 
+struct req_exec_cpg_procjoin_with_ts {
+	coroipc_request_header_t header __attribute__((aligned(8)));
+	mar_cpg_name_t group_name __attribute__((aligned(8)));
+	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t reason __attribute__((aligned(8)));
+	mar_uint64_t timestamp __attribute__((aligned(8)));
+};
+
 struct req_exec_cpg_mcast {
 	coroipc_request_header_t header __attribute__((aligned(8)));
 	mar_cpg_name_t group_name __attribute__((aligned(8)));
@@ -504,6 +518,9 @@ static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
 	logsys_subsys_init();
 #endif
 	api = corosync_api;
+
+	timestamp = (uint64_t) time(NULL);
+
 	return (0);
 }
 
@@ -524,7 +541,7 @@ static int cpg_lib_exit_fn (void *conn)
 
 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
 {
-	struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
+	struct req_exec_cpg_procjoin_with_ts req_exec_cpg_procjoin;
 	struct iovec req_exec_cpg_iovec;
 	int result;
 
@@ -532,6 +549,8 @@ static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *grou
 	req_exec_cpg_procjoin.pid = pid;
 	req_exec_cpg_procjoin.reason = reason;
 
+	req_exec_cpg_procjoin.timestamp = timestamp;
+
 	req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
 	req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
 
@@ -601,6 +620,12 @@ static void exec_cpg_procjoin_endian_convert (void *msg)
 	req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
 	swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
 	req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
+
+	if (req_exec_cpg_procjoin->header.size > sizeof (struct req_exec_cpg_procjoin)) {
+		/* Message is full message with ts */
+		((struct req_exec_cpg_procjoin_with_ts *) msg)->timestamp =
+			swab64(((struct req_exec_cpg_procjoin_with_ts *) msg)->timestamp);
+	}
 }
 
 static void exec_cpg_joinlist_endian_convert (void *msg_v)
@@ -614,6 +639,27 @@ static void exec_cpg_joinlist_endian_convert (void *msg_v)
 	while ((const char*)jle < msg + res->size) {
 		jle->pid = swab32(jle->pid);
 		swab_mar_cpg_name_t (&jle->group_name);
+
+		/* Test, if this pid is special pid */
+		if (jle->pid == GROUP_SPECIAL_PID &&
+			memcmp (jle->group_name.value, special_pid_check, sizeof (special_pid_check)) == 0) {
+
+			uint32_t ainfo_size;
+			uint64_t ntimestamp;
+
+			ainfo_size = *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check)));
+			ainfo_size = swab32 (ainfo_size);
+			*((uint32_t *)(jle->group_name.value + sizeof (special_pid_check))) = ainfo_size;
+
+			if (ainfo_size >= sizeof (timestamp)) {
+				/* We have timestamp */
+				ntimestamp = *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) +
+					sizeof (uint32_t)));
+				ntimestamp = swab64 (ntimestamp);
+				*((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) +
+					sizeof (uint32_t))) = ntimestamp;
+			}
+		}
 		jle++;
 	}
 }
@@ -662,6 +708,7 @@ static void do_proc_join(
 	const mar_cpg_name_t *name,
 	uint32_t pid,
 	unsigned int nodeid,
+	uint64_t ntimestamp,
 	int reason)
 {
 	struct process_info *pi;
@@ -680,6 +727,7 @@ static void do_proc_join(
 	}
 	pi->nodeid = nodeid;
 	pi->pid = pid;
+	pi->timestamp = ntimestamp;
 	memcpy(&pi->group, name, sizeof(*name));
 	list_init(&pi->list);
 
@@ -750,11 +798,56 @@ static void message_handler_req_exec_cpg_procjoin (
 	unsigned int nodeid)
 {
 	const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
+	uint64_t ntimestamp;
+	struct list_head *iter;
+	int node_failed = 0;
+	mar_cpg_address_t left_list[1];
 
 	log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid);
 
+	if (req_exec_cpg_procjoin->header.size > sizeof (struct req_exec_cpg_procjoin)) {
+		/* Message with timestamp */
+		ntimestamp = ((struct req_exec_cpg_procjoin_with_ts *)message)->timestamp;
+	} else {
+		ntimestamp = ~0;
+	}
+
+	/* Test if node didn't failed before */
+	for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
+		struct process_info *pi = list_entry(iter, struct process_info, list);
+
+		if (pi->nodeid == nodeid && ntimestamp > pi->timestamp) {
+			/* Node failed*/
+			node_failed = 1;
+			break;
+		}
+	}
+
+	if (node_failed) {
+		/* Fake leave message */
+		log_printf(LOGSYS_LEVEL_DEBUG, "node %d failed. Delivering leave message\n", nodeid);
+
+		for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
+			struct process_info *pi = list_entry(iter, struct process_info, list);
+			iter = iter->next;
+
+			if (pi->nodeid == nodeid) {
+				left_list[0].nodeid = pi->nodeid;
+				left_list[0].pid = pi->pid;
+				left_list[0].reason = CONFCHG_CPG_REASON_NODEDOWN;
+
+				notify_lib_joinlist(&pi->group, NULL,
+							0, NULL,
+							1, left_list,
+							MESSAGE_RES_CPG_CONFCHG_CALLBACK);
+				list_del (&pi->list);
+				free (pi);
+			}
+		}
+	}
+
 	do_proc_join (&req_exec_cpg_procjoin->group_name,
-		req_exec_cpg_procjoin->pid, nodeid,
+		req_exec_cpg_procjoin->pid, nodeid, ntimestamp,
 		CONFCHG_CPG_REASON_JOIN);
 }
 
@@ -799,6 +892,7 @@ static void message_handler_req_exec_cpg_joinlist (
 	const char *message = message_v;
 	const coroipc_response_header_t *res = (const coroipc_response_header_t *)message;
 	const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
+	uint64_t ntimestamp;
 
 	log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node %x\n",
 		nodeid);
@@ -808,9 +902,36 @@ static void message_handler_req_exec_cpg_joinlist (
 		return;
 	}
 
+	jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
+
+	ntimestamp = ~0;
+
+	/* We first try to found, if we have timestamp or not */
+	while ((const char*)jle < message + res->size) {
+		if (jle->pid == GROUP_SPECIAL_PID &&
+			memcmp (jle->group_name.value, special_pid_check, sizeof (special_pid_check)) == 0) {
+
+			/* It looks like we have special pid -> timestamp*/
+			uint32_t ainfo_size;
+
+			ainfo_size = *((uint32_t *)(jle->group_name.value + sizeof (special_pid_check)));
+
+			if (ainfo_size >= sizeof (timestamp)) {
+				/* We have timestamp */
+				ntimestamp = *((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) +
+					sizeof (uint32_t)));
+
+				log_printf(LOGSYS_LEVEL_DEBUG, "joinlist with timestamp %lld\n", ntimestamp);
+			}
+		}
+		jle++;
+	}
+
+	jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
+
 	while ((const char*)jle < message + res->size) {
 		do_proc_join (&jle->group_name, jle->pid, nodeid,
-			CONFCHG_CPG_REASON_NODEUP);
+			ntimestamp, CONFCHG_CPG_REASON_NODEUP);
 		jle++;
 	}
 }
@@ -873,7 +994,7 @@ static int cpg_exec_send_joinlist(void)
 	if (!count)
 		return 0;
 
-	buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * count);
+	buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * (count + 1));
 	if (!buf) {
 		log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
 		return -1;
@@ -892,8 +1013,15 @@ static int cpg_exec_send_joinlist(void)
 		}
 	}
 
+	/* Time to add special pid with timestamp */
+	jle->pid = GROUP_SPECIAL_PID;
+	jle->group_name.length = sizeof (special_pid_check);
+	memcpy(jle->group_name.value, special_pid_check, sizeof (special_pid_check));
+	*((uint32_t *)(jle->group_name.value + sizeof (special_pid_check))) = sizeof (timestamp);
+	*((uint64_t *)(jle->group_name.value + sizeof (special_pid_check) + sizeof (uint32_t))) = timestamp;
+
 	res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST);
-	res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * count;
+	res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * (count + 1);
 
 	req_exec_cpg_iovec.iov_base = buf;
 	req_exec_cpg_iovec.iov_len = res->size;
_______________________________________________
Openais mailing list
Openais@lists.linux-foundation.org
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to