The current totempg layer suffers from a race condition whereby one ipc
connection could obtain information about how much room is available to
fill in multicasts while another actually fills them in.  Then the
second ipc connection attempts to multicast, but the multicast queue is
full.

This patch removes the race by allowing ipc connections to reserve space
in the outbound totem queue and essentially "lock up" their transmission
spot.

Regards
-steve
Index: exec/evs.c
===================================================================
--- exec/evs.c	(revision 1735)
+++ exec/evs.c	(working copy)
@@ -374,7 +374,6 @@
 	struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined;
 	struct iovec req_exec_evs_mcast_iovec[3];
 	struct req_exec_evs_mcast req_exec_evs_mcast;
-	int send_ok = 0;
 	int res;
 	struct evs_pd *evs_pd = (struct evs_pd *)openais_conn_private_data_get (conn);
 
@@ -393,8 +392,6 @@
 	req_exec_evs_mcast_iovec[1].iov_len = evs_pd->group_entries * sizeof (struct evs_group);
 	req_exec_evs_mcast_iovec[2].iov_base = &req_lib_evs_mcast_joined->msg;
 	req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_joined->msg_len;
-// TODO this doesn't seem to work for some reason	
-	send_ok = totempg_groups_send_ok_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3);
 
 	res = totempg_groups_mcast_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED);
 		// TODO
@@ -420,7 +417,6 @@
 	struct iovec req_exec_evs_mcast_iovec[3];
 	struct req_exec_evs_mcast req_exec_evs_mcast;
 	char *msg_addr;
-	int send_ok = 0;
 	int res;
 
 	req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast) +
@@ -443,8 +439,6 @@
 	req_exec_evs_mcast_iovec[2].iov_base = msg_addr;
 	req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_groups->msg_len;
 	
-// TODO this is wacky
-	send_ok = totempg_groups_send_ok_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3);
 	res = totempg_groups_mcast_joined (openais_group_handle, req_exec_evs_mcast_iovec, 3, TOTEMPG_AGREED);
 	if (res == 0) {
 		error = EVS_OK;
Index: exec/ipc.c
===================================================================
--- exec/ipc.c	(revision 1736)
+++ exec/ipc.c	(working copy)
@@ -296,8 +296,8 @@
 	struct res_overlay res_overlay;
 	struct iovec send_ok_joined_iovec;
 	int send_ok = 0;
+	int reserved_msgs = 0;
 	int flow_control = 0;
-	int send_ok_joined = 0;
 
 	for (;;) {
 		sop.sem_num = 0;
@@ -323,7 +323,8 @@
 
 		send_ok_joined_iovec.iov_base = (char *)header;
 		send_ok_joined_iovec.iov_len = header->size;
-		send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle,
+		reserved_msgs = totempg_groups_joined_reserve (
+			openais_group_handle,
 			&send_ok_joined_iovec, 1);
 
 		/* Sanity check service and header.id */
@@ -349,7 +350,7 @@
 
 		} else if(send_ok
 		   && flow_control == OPENAIS_FLOW_CONTROL_REQUIRED
-		   && (send_ok_joined == 0 || sync_in_process() != 0)) {
+		   && (reserved_msgs == 0 || sync_in_process() != 0)) {
 		    send_ok = 0;
 		}
 
@@ -367,6 +368,8 @@
 			openais_response_send (conn_info, &res_overlay, 
 				res_overlay.header.size);
 		}
+
+		totempg_groups_joined_release (reserved_msgs);
 		openais_conn_refcount_dec (conn);
 	}
 	pthread_exit (0);
Index: exec/totempg.c
===================================================================
--- exec/totempg.c	(revision 1735)
+++ exec/totempg.c	(working copy)
@@ -144,6 +144,8 @@
 
 static int mcast_packed_msg_count = 0;
 
+static int totempg_reserved = 0;
+
 /*
  * Function and data used to log messages
  */
@@ -221,8 +223,6 @@
 	.mutex		= PTHREAD_MUTEX_INITIALIZER
 };
 
-static int send_ok (int msg_size);
-
 static unsigned char next_fragment = 1;
 
 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -234,6 +234,10 @@
 #define log_printf(level, format, args...) \
     totempg_log_printf (__FILE__, __LINE__, level, format, ##args)
 
+static int msg_count_send_ok (int msg_count);
+
+static int byte_count_send_ok (int byte_count);
+
 static struct assembly *assembly_ref (unsigned int nodeid)
 {
 	struct assembly *assembly;
@@ -757,7 +761,7 @@
 		total_size += iovec[i].iov_len;
 	}
 
-	if (send_ok (total_size + sizeof(unsigned short) *
+	if (byte_count_send_ok (total_size + sizeof(unsigned short) *
 		(mcast_packed_msg_count+1)) == 0) {
 
 		pthread_mutex_unlock (&mcast_msg_mutex);
@@ -880,25 +884,46 @@
 /*
  * Determine if a message of msg_size could be queued
  */
-#define FUZZY_AVAIL_SUBTRACT 5
-static int send_ok (
-	int msg_size)
+static int msg_count_send_ok (
+	int msg_count)
 {
 	int avail = 0;
-	int total;
 
-	avail = totemmrp_avail () - FUZZY_AVAIL_SUBTRACT;
+	avail = totemmrp_avail () - totempg_reserved - 1;
 	
-	/*
-	 * msg size less then totempg_totem_config->net_mtu - 25 will take up
-	 * a full message, so add +1
-	 * totempg_totem_config->net_mtu - 25 is for the totempg_mcast header
-	 */
-	total = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1; 
+	return (avail > msg_count);
+}
 
-	return (avail >= total);
+static int byte_count_send_ok (
+	int byte_count)
+{
+	unsigned int msg_count = 0;
+	int avail = 0;
+
+	avail = totemmrp_avail () - totempg_reserved - 1;
+
+	msg_count = (byte_count / (totempg_totem_config->net_mtu - 25)) + 1; 
+
+	return (avail >= msg_count);
 }
 
+static int send_reserve (
+	int msg_size)
+{
+	unsigned int msg_count = 0;
+
+	msg_count = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1; 
+	totempg_reserved += msg_count;
+
+	return (msg_count);
+}
+
+static void send_release (
+	int msg_count)
+{
+	totempg_reserved -= msg_count;
+}
+
 int totempg_callback_token_create (
 	void **handle_out,
 	enum totem_callback_token_type type,
@@ -1083,7 +1108,7 @@
 	return (res);
 }
 
-int totempg_groups_send_ok_joined (
+int totempg_groups_joined_reserve (
 	totempg_groups_handle handle,
 	struct iovec *iovec,
 	int iov_len)
@@ -1092,6 +1117,7 @@
 	unsigned int size = 0;
 	unsigned int i;
 	unsigned int res;
+	unsigned int reserved = 0;
 
 	pthread_mutex_lock (&totempg_mutex);
 	pthread_mutex_lock (&mcast_msg_mutex);
@@ -1108,22 +1134,30 @@
 		size += iovec[i].iov_len;
 	}
 
-	/*
-	 * 2000 is a number chosen to represent the maximum size of
-	 * the totempg header used in the transmission of messages
-	 */
-	size += 2000;
+	reserved = send_reserve (size);
+	if (msg_count_send_ok (reserved) == 0) {
+		send_release (reserved);
+		reserved = 0;
+	}
 
-	res = send_ok (size);
-
 	hdb_handle_put (&totempg_groups_instance_database, handle);
 
 error_exit:
 	pthread_mutex_unlock (&mcast_msg_mutex);
 	pthread_mutex_unlock (&totempg_mutex);
-	return (res);
+	return (reserved);
 }
 
+void totempg_groups_joined_release (int msg_count)
+{
+	
+	pthread_mutex_lock (&totempg_mutex);
+	pthread_mutex_lock (&mcast_msg_mutex);
+	send_release (msg_count);
+	pthread_mutex_unlock (&mcast_msg_mutex);
+	pthread_mutex_unlock (&totempg_mutex);
+}
+
 int totempg_groups_mcast_groups (
 	totempg_groups_handle handle,
 	int guarantee,
@@ -1199,7 +1233,7 @@
 		size += iovec[i].iov_len;
 	}
 
-	res = send_ok (size);
+	res = msg_count_send_ok (size);
 	 
 	hdb_handle_put (&totempg_groups_instance_database, handle);
 error_exit:
Index: exec/totempg.h
===================================================================
--- exec/totempg.h	(revision 1735)
+++ exec/totempg.h	(working copy)
@@ -111,10 +111,13 @@
 	int iov_len,
 	int guarantee);
 
-extern int totempg_groups_send_ok_joined (
+extern int totempg_groups_joined_reserve (
 	totempg_groups_handle handle,
 	struct iovec *iovec,
 	int iov_len);
+
+extern void totempg_groups_joined_release (
+	int msg_count);
 	
 extern int totempg_groups_mcast_groups (
 	totempg_groups_handle handle,
Index: exec/lck.c
===================================================================
--- exec/lck.c	(revision 1735)
+++ exec/lck.c	(working copy)
@@ -504,11 +504,7 @@
 	iovec.iov_base = (char *)&req_exec_lck_resourceclose;
 	iovec.iov_len = sizeof (req_exec_lck_resourceclose);
 
-	if (totempg_groups_send_ok_joined (openais_group_handle, &iovec, 1)) {
-		assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
-		return (0);
-	}
-
+	assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED) == 0);
 	return (-1);
 }
 
@@ -1356,9 +1352,7 @@
 		iovecs[0].iov_base = (char *)&req_exec_lck_resourceclose;
 		iovecs[0].iov_len = sizeof (req_exec_lck_resourceclose);
 
-		if (totempg_groups_send_ok_joined (openais_group_handle, iovecs, 1)) {
-			assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
-		}
+		assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1, TOTEMPG_AGREED) == 0);
 	}
 	else {
 		log_printf (LOG_LEVEL_ERROR, "#### LCK: Could Not Find the Checkpoint to close so Returning Error. ####\n");
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to