And now, the missing file: cpgbench.c (and/or patch) are attached.

* Mathieu Virbel <[email protected]> [2009-01-08 11:09:31]:

> Hi everyone,
> 
> I've hit a bug with using CPG, and maybe not really related with CPG.
> While i'm developping futur application based on CPG, i've stressed
> service, and failed too easily.
> 
> Failure happen when CPG message are too big, and some thread are
> consuming some CPU. When the case happen, corosync failed to send
> message, and show :
> [TOTEM ] The token was lost in the OPERATIONAL state.
> And loop on this while executing test.
> 
> To make thing easier to test, i've modified cpgbench to :
> - add a "listener" mode (we just listening, no sending message)
> - add options to configure start size of message, and grow size
> - add option to launch background thread who consume some CPU
> 
> To reproduce bug, you need 2 nodes. One the first, start with:
> ./cpgbench -l
> 
> On the second :
> ./cpgbench -t 10 -s 100000
> (start with 10 cpu consumer, and set cpg message size to 100Ko).
> 
> Maybe you need to increase value if you have a big cpu :)
> (this failed on CPU: VIA Samuel 2 (400.91-MHz 686-class CPU))
> 
> 
> Is someone can help us ?
> 
> Regards,
> 
> --
> Mathieu Virbel
> 
> NETASQ - We secure IT
> 3 rue Archimède
> 59650 Villeneuve d'Ascq
> France
> 
> 
> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais
#define _BSD_SOURCE
/*
 * Copyright (c) 2006 Red Hat, Inc.
 *
 * All rights reserved.
 *
 * Author: Steven Dake ([email protected])
 *
 * This software licensed under BSD license, the text of which follows:
 * 
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * - Redistributions of source code must retain the above copyright notice,
 *   this list of conditions and the following disclaimer.
 * - Redistributions in binary form must reproduce the above copyright notice,
 *   this list of conditions and the following disclaimer in the documentation
 *   and/or other materials provided with the distribution.
 * - Neither the name of the MontaVista Software, Inc. nor the names of its
 *   contributors may be used to endorse or promote products derived from this
 *   software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 * THE POSSIBILITY OF SUCH DAMAGE.
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/un.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>

#include <corosync/corotypes.h>
#include <corosync/cpg.h>

#ifdef COROSYNC_SOLARIS
#define timersub(a, b, result)						\
    do {								\
	(result)->tv_sec = (a)->tv_sec - (b)->tv_sec;			\
	(result)->tv_usec = (a)->tv_usec - (b)->tv_usec;		\
	if ((result)->tv_usec < 0) {					\
	    --(result)->tv_sec;						\
	    (result)->tv_usec += 1000000;				\
	}								\
    } while (0)
#endif

int alarm_notice;
int listen_only		= 0;
int size_grow		= 1000;
int thread_count	= 0;
pthread_t	thread	= {0};

void cpg_bm_confchg_fn (
	cpg_handle_t handle,
	struct cpg_name *group_name,
	struct cpg_address *member_list, int member_list_entries,
	struct cpg_address *left_list, int left_list_entries,
	struct cpg_address *joined_list, int joined_list_entries)
{
}

unsigned int write_count;

void cpg_bm_deliver_fn (
        cpg_handle_t handle,
        struct cpg_name *group_name,
        uint32_t nodeid,
        uint32_t pid,
        void *msg,
        int msg_len)
{
	write_count++;
}

cpg_callbacks_t callbacks = {
	.cpg_deliver_fn 	= cpg_bm_deliver_fn,
	.cpg_confchg_fn		= cpg_bm_confchg_fn
};

char *data = NULL;

void cpg_benchmark (
	cpg_handle_t handle,
	int write_size)
{
	struct timeval tv1, tv2, tv_elapsed;
	struct iovec iov;
	unsigned int res;
	cpg_flow_control_state_t flow_control_state;

	alarm_notice = 0;
	iov.iov_base = data;
	iov.iov_len = write_size;

	write_count = 0;
	alarm (10);

	gettimeofday (&tv1, NULL);
	do {
		if ( !listen_only )
		{
			/*
			 * Test cpg message write
			 */
			cpg_flow_control_state_get (handle, &flow_control_state);
			if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
retry:
				res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
				if (res == CS_ERR_TRY_AGAIN) {
					goto retry;
				}
			}
		}
		res = cpg_dispatch (handle, CS_DISPATCH_ALL);
		if (res != CS_OK) {
			printf ("cpg dispatch returned error %d\n", res);
			exit (1);
		}
	} while (alarm_notice == 0);
	gettimeofday (&tv2, NULL);
	timersub (&tv2, &tv1, &tv_elapsed);

	printf ("%5d messages received ", write_count);
	printf ("%5d bytes per write ", write_size);
	printf ("%7.3f Seconds runtime ", 
		(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
	printf ("%9.3f TP/s ",
		((float)write_count) /  (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
	printf ("%7.3f MB/s.\n", 
		((float)write_count) * ((float)write_size) /  ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
}

void sigalrm_handler (int num)
{
	alarm_notice = 1;
}

static struct cpg_name group_name = {
	.value = "cpg_bm",
	.length = 6
};

void *thread_run(void *data)
{
	int a = 0;
	printf("Start background thread\n");
	while (1) a++;
	return NULL;
}

void usage()
{
	printf("Usage: ./cpgbench [-lsgth]\n");
	printf(" -l           start in listener mode (no message sent)\n");
	printf(" -s <size>    start with message of <size>\n");
	printf(" -g <size>    grow each iteration with <size>\n");
	printf(" -t <number>  start <number> threads in background\n");
	printf("\n");
}

int main (int argc, char **argv) {
	cpg_handle_t handle;
	char *newptr;
	unsigned int size = 1;
	int i;
	unsigned int res;
	int ch;

	while ((ch = getopt (argc, argv, "hls:g:t:")) != -1) {
		switch (ch) {
			case 'l':
				listen_only = 1;
				break;
			case 's':
				size = atoi(optarg);
				printf("[INFO] set start size to %d\n", size);
				break;
			case 'g':
				size_grow = atoi(optarg);
				printf("[INFO] set grow size to %d\n", size_grow);
				break;
			case 't':
				thread_count = atoi(optarg);
				printf("[INFO] start %d in background\n", thread_count);
				break;
			case 'h':
			case '?':
			default:
				usage();
				return 1;
		}
	}
	
	signal (SIGALRM, sigalrm_handler);
	res = cpg_initialize (&handle, &callbacks);
	if (res != CS_OK) {
		printf ("cpg_initialize failed with result %d\n", res);
		exit (1);
	}
	
	res = cpg_join (handle, &group_name);
	if (res != CS_OK) {
		printf ("cpg_join failed with result %d\n", res);
		exit (1);
	}

	if ( !listen_only )
	{
		/* start thread
		 */
		while ( thread_count-- > 0 )
			pthread_create(&thread, NULL, thread_run, NULL);
	}

	for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
		newptr = realloc(data, size);
		if (newptr == NULL) {
			printf ("cannot realloc data to %d, abort.\n");
			break;
		}
		data = newptr;

		cpg_benchmark (handle, size);
		size += size_grow;
	}

	res = cpg_finalize (handle);
	if (res != CS_OK) {
		printf ("cpg_join failed with result %d\n", res);
		exit (1);
	}

	if (data != NULL)
		free(data);

	return (0);
}
Index: test/cpgbench.c
===================================================================
--- test/cpgbench.c	(révision 1718)
+++ test/cpgbench.c	(copie de travail)
@@ -49,6 +49,7 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
+#include <pthread.h>
 
 #include <corosync/corotypes.h>
 #include <corosync/cpg.h>
@@ -66,6 +67,10 @@
 #endif
 
 int alarm_notice;
+int listen_only		= 0;
+int size_grow		= 1000;
+int thread_count	= 0;
+pthread_t	thread	= {0};
 
 void cpg_bm_confchg_fn (
 	cpg_handle_t handle,
@@ -94,7 +99,7 @@
 	.cpg_confchg_fn		= cpg_bm_confchg_fn
 };
 
-char data[500000];
+char *data = NULL;
 
 void cpg_benchmark (
 	cpg_handle_t handle,
@@ -114,15 +119,18 @@
 
 	gettimeofday (&tv1, NULL);
 	do {
-		/*
-		 * Test checkpoint write
-		 */
-		cpg_flow_control_state_get (handle, &flow_control_state);
-		if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
+		if ( !listen_only )
+		{
+			/*
+			 * Test cpg message write
+			 */
+			cpg_flow_control_state_get (handle, &flow_control_state);
+			if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
 retry:
-			res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
-			if (res == CS_ERR_TRY_AGAIN) {
-				goto retry;
+				res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
+				if (res == CS_ERR_TRY_AGAIN) {
+					goto retry;
+				}
 			}
 		}
 		res = cpg_dispatch (handle, CS_DISPATCH_ALL);
@@ -154,11 +162,56 @@
 	.length = 6
 };
 
-int main (void) {
+void *thread_run(void *data)
+{
+	int a = 0;
+	printf("Start background thread\n");
+	while (1) a++;
+	return NULL;
+}
+
+void usage()
+{
+	printf("Usage: ./cpgbench [-lsgth]\n");
+	printf(" -l           start in listener mode (no message sent)\n");
+	printf(" -s <size>    start with message of <size>\n");
+	printf(" -g <size>    grow each iteration with <size>\n");
+	printf(" -t <number>  start <number> threads in background\n");
+	printf("\n");
+}
+
+int main (int argc, char **argv) {
 	cpg_handle_t handle;
+	char *newptr;
 	unsigned int size = 1;
 	int i;
 	unsigned int res;
+	int ch;
+
+	while ((ch = getopt (argc, argv, "hls:g:t:")) != -1) {
+		switch (ch) {
+			case 'l':
+				listen_only = 1;
+				break;
+			case 's':
+				size = atoi(optarg);
+				printf("[INFO] set start size to %d\n", size);
+				break;
+			case 'g':
+				size_grow = atoi(optarg);
+				printf("[INFO] set grow size to %d\n", size_grow);
+				break;
+			case 't':
+				thread_count = atoi(optarg);
+				printf("[INFO] start %d in background\n", thread_count);
+				break;
+			case 'h':
+			case '?':
+			default:
+				usage();
+				return 1;
+		}
+	}
 	
 	signal (SIGALRM, sigalrm_handler);
 	res = cpg_initialize (&handle, &callbacks);
@@ -173,9 +226,24 @@
 		exit (1);
 	}
 
+	if ( !listen_only )
+	{
+		/* start thread
+		 */
+		while ( thread_count-- > 0 )
+			pthread_create(&thread, NULL, thread_run, NULL);
+	}
+
 	for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
+		newptr = realloc(data, size);
+		if (newptr == NULL) {
+			printf ("cannot realloc data to %d, abort.\n");
+			break;
+		}
+		data = newptr;
+
 		cpg_benchmark (handle, size);
-		size += 1000;
+		size += size_grow;
 	}
 
 	res = cpg_finalize (handle);
@@ -183,5 +251,9 @@
 		printf ("cpg_join failed with result %d\n", res);
 		exit (1);
 	}
+
+	if (data != NULL)
+		free(data);
+
 	return (0);
 }
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to