Got same troubles with 2 machines having:
CPU: Intel(R) Xeon(R) CPU 5160  @ 3.00GHz (2992.52-MHz 686-class CPU)

And starting ./cpgbench -t 10 -s 100000

No secauth activated.
No flow control happen.

Regards,


* Mathieu Virbel <[email protected]> [2009-01-08 11:14:24]:

> And now, the missing file: cpgbench.c (and/or patch) are attached.
> 
> * Mathieu Virbel <[email protected]> [2009-01-08 11:09:31]:
> 
> > Hi everyone,
> > 
> > I've hit a bug with using CPG, and maybe not really related with CPG.
> > While i'm developping futur application based on CPG, i've stressed
> > service, and failed too easily.
> > 
> > Failure happen when CPG message are too big, and some thread are
> > consuming some CPU. When the case happen, corosync failed to send
> > message, and show :
> > [TOTEM ] The token was lost in the OPERATIONAL state.
> > And loop on this while executing test.
> > 
> > To make thing easier to test, i've modified cpgbench to :
> > - add a "listener" mode (we just listening, no sending message)
> > - add options to configure start size of message, and grow size
> > - add option to launch background thread who consume some CPU
> > 
> > To reproduce bug, you need 2 nodes. One the first, start with:
> > ./cpgbench -l
> > 
> > On the second :
> > ./cpgbench -t 10 -s 100000
> > (start with 10 cpu consumer, and set cpg message size to 100Ko).
> > 
> > Maybe you need to increase value if you have a big cpu :)
> > (this failed on CPU: VIA Samuel 2 (400.91-MHz 686-class CPU))
> > 
> > 
> > Is someone can help us ?
> > 
> > Regards,
> > 
> > --
> > Mathieu Virbel
> > 
> > NETASQ - We secure IT
> > 3 rue Archimède
> > 59650 Villeneuve d'Ascq
> > France
> > 
> > 
> > _______________________________________________
> > Openais mailing list
> > [email protected]
> > https://lists.linux-foundation.org/mailman/listinfo/openais

> #define _BSD_SOURCE
> /*
>  * Copyright (c) 2006 Red Hat, Inc.
>  *
>  * All rights reserved.
>  *
>  * Author: Steven Dake ([email protected])
>  *
>  * This software licensed under BSD license, the text of which follows:
>  * 
>  * Redistribution and use in source and binary forms, with or without
>  * modification, are permitted provided that the following conditions are met:
>  *
>  * - Redistributions of source code must retain the above copyright notice,
>  *   this list of conditions and the following disclaimer.
>  * - Redistributions in binary form must reproduce the above copyright notice,
>  *   this list of conditions and the following disclaimer in the documentation
>  *   and/or other materials provided with the distribution.
>  * - Neither the name of the MontaVista Software, Inc. nor the names of its
>  *   contributors may be used to endorse or promote products derived from this
>  *   software without specific prior written permission.
>  *
>  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
>  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
>  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
>  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
>  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
>  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
>  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
>  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
>  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
>  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
>  * THE POSSIBILITY OF SUCH DAMAGE.
>  */
> 
> #include <stdio.h>
> #include <stdlib.h>
> #include <string.h>
> #include <signal.h>
> #include <unistd.h>
> #include <errno.h>
> #include <unistd.h>
> #include <time.h>
> #include <sys/time.h>
> #include <sys/types.h>
> #include <sys/socket.h>
> #include <sys/select.h>
> #include <sys/un.h>
> #include <sys/socket.h>
> #include <netinet/in.h>
> #include <arpa/inet.h>
> #include <pthread.h>
> 
> #include <corosync/corotypes.h>
> #include <corosync/cpg.h>
> 
> #ifdef COROSYNC_SOLARIS
> #define timersub(a, b, result)                                                
> \
>     do {                                                              \
>       (result)->tv_sec = (a)->tv_sec - (b)->tv_sec;                   \
>       (result)->tv_usec = (a)->tv_usec - (b)->tv_usec;                \
>       if ((result)->tv_usec < 0) {                                    \
>           --(result)->tv_sec;                                         \
>           (result)->tv_usec += 1000000;                               \
>       }                                                               \
>     } while (0)
> #endif
> 
> int alarm_notice;
> int listen_only               = 0;
> int size_grow         = 1000;
> int thread_count      = 0;
> pthread_t     thread  = {0};
> 
> void cpg_bm_confchg_fn (
>       cpg_handle_t handle,
>       struct cpg_name *group_name,
>       struct cpg_address *member_list, int member_list_entries,
>       struct cpg_address *left_list, int left_list_entries,
>       struct cpg_address *joined_list, int joined_list_entries)
> {
> }
> 
> unsigned int write_count;
> 
> void cpg_bm_deliver_fn (
>         cpg_handle_t handle,
>         struct cpg_name *group_name,
>         uint32_t nodeid,
>         uint32_t pid,
>         void *msg,
>         int msg_len)
> {
>       write_count++;
> }
> 
> cpg_callbacks_t callbacks = {
>       .cpg_deliver_fn         = cpg_bm_deliver_fn,
>       .cpg_confchg_fn         = cpg_bm_confchg_fn
> };
> 
> char *data = NULL;
> 
> void cpg_benchmark (
>       cpg_handle_t handle,
>       int write_size)
> {
>       struct timeval tv1, tv2, tv_elapsed;
>       struct iovec iov;
>       unsigned int res;
>       cpg_flow_control_state_t flow_control_state;
> 
>       alarm_notice = 0;
>       iov.iov_base = data;
>       iov.iov_len = write_size;
> 
>       write_count = 0;
>       alarm (10);
> 
>       gettimeofday (&tv1, NULL);
>       do {
>               if ( !listen_only )
>               {
>                       /*
>                        * Test cpg message write
>                        */
>                       cpg_flow_control_state_get (handle, 
> &flow_control_state);
>                       if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
> retry:
>                               res = cpg_mcast_joined (handle, 
> CPG_TYPE_AGREED, &iov, 1);
>                               if (res == CS_ERR_TRY_AGAIN) {
>                                       goto retry;
>                               }
>                       }
>               }
>               res = cpg_dispatch (handle, CS_DISPATCH_ALL);
>               if (res != CS_OK) {
>                       printf ("cpg dispatch returned error %d\n", res);
>                       exit (1);
>               }
>       } while (alarm_notice == 0);
>       gettimeofday (&tv2, NULL);
>       timersub (&tv2, &tv1, &tv_elapsed);
> 
>       printf ("%5d messages received ", write_count);
>       printf ("%5d bytes per write ", write_size);
>       printf ("%7.3f Seconds runtime ", 
>               (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
>       printf ("%9.3f TP/s ",
>               ((float)write_count) /  (tv_elapsed.tv_sec + 
> (tv_elapsed.tv_usec / 1000000.0)));
>       printf ("%7.3f MB/s.\n", 
>               ((float)write_count) * ((float)write_size) /  
> ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
> }
> 
> void sigalrm_handler (int num)
> {
>       alarm_notice = 1;
> }
> 
> static struct cpg_name group_name = {
>       .value = "cpg_bm",
>       .length = 6
> };
> 
> void *thread_run(void *data)
> {
>       int a = 0;
>       printf("Start background thread\n");
>       while (1) a++;
>       return NULL;
> }
> 
> void usage()
> {
>       printf("Usage: ./cpgbench [-lsgth]\n");
>       printf(" -l           start in listener mode (no message sent)\n");
>       printf(" -s <size>    start with message of <size>\n");
>       printf(" -g <size>    grow each iteration with <size>\n");
>       printf(" -t <number>  start <number> threads in background\n");
>       printf("\n");
> }
> 
> int main (int argc, char **argv) {
>       cpg_handle_t handle;
>       char *newptr;
>       unsigned int size = 1;
>       int i;
>       unsigned int res;
>       int ch;
> 
>       while ((ch = getopt (argc, argv, "hls:g:t:")) != -1) {
>               switch (ch) {
>                       case 'l':
>                               listen_only = 1;
>                               break;
>                       case 's':
>                               size = atoi(optarg);
>                               printf("[INFO] set start size to %d\n", size);
>                               break;
>                       case 'g':
>                               size_grow = atoi(optarg);
>                               printf("[INFO] set grow size to %d\n", 
> size_grow);
>                               break;
>                       case 't':
>                               thread_count = atoi(optarg);
>                               printf("[INFO] start %d in background\n", 
> thread_count);
>                               break;
>                       case 'h':
>                       case '?':
>                       default:
>                               usage();
>                               return 1;
>               }
>       }
>       
>       signal (SIGALRM, sigalrm_handler);
>       res = cpg_initialize (&handle, &callbacks);
>       if (res != CS_OK) {
>               printf ("cpg_initialize failed with result %d\n", res);
>               exit (1);
>       }
>       
>       res = cpg_join (handle, &group_name);
>       if (res != CS_OK) {
>               printf ("cpg_join failed with result %d\n", res);
>               exit (1);
>       }
> 
>       if ( !listen_only )
>       {
>               /* start thread
>                */
>               while ( thread_count-- > 0 )
>                       pthread_create(&thread, NULL, thread_run, NULL);
>       }
> 
>       for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
>               newptr = realloc(data, size);
>               if (newptr == NULL) {
>                       printf ("cannot realloc data to %d, abort.\n");
>                       break;
>               }
>               data = newptr;
> 
>               cpg_benchmark (handle, size);
>               size += size_grow;
>       }
> 
>       res = cpg_finalize (handle);
>       if (res != CS_OK) {
>               printf ("cpg_join failed with result %d\n", res);
>               exit (1);
>       }
> 
>       if (data != NULL)
>               free(data);
> 
>       return (0);
> }

> Index: test/cpgbench.c
> ===================================================================
> --- test/cpgbench.c   (révision 1718)
> +++ test/cpgbench.c   (copie de travail)
> @@ -49,6 +49,7 @@
>  #include <sys/socket.h>
>  #include <netinet/in.h>
>  #include <arpa/inet.h>
> +#include <pthread.h>
>  
>  #include <corosync/corotypes.h>
>  #include <corosync/cpg.h>
> @@ -66,6 +67,10 @@
>  #endif
>  
>  int alarm_notice;
> +int listen_only              = 0;
> +int size_grow                = 1000;
> +int thread_count     = 0;
> +pthread_t    thread  = {0};
>  
>  void cpg_bm_confchg_fn (
>       cpg_handle_t handle,
> @@ -94,7 +99,7 @@
>       .cpg_confchg_fn         = cpg_bm_confchg_fn
>  };
>  
> -char data[500000];
> +char *data = NULL;
>  
>  void cpg_benchmark (
>       cpg_handle_t handle,
> @@ -114,15 +119,18 @@
>  
>       gettimeofday (&tv1, NULL);
>       do {
> -             /*
> -              * Test checkpoint write
> -              */
> -             cpg_flow_control_state_get (handle, &flow_control_state);
> -             if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
> +             if ( !listen_only )
> +             {
> +                     /*
> +                      * Test cpg message write
> +                      */
> +                     cpg_flow_control_state_get (handle, 
> &flow_control_state);
> +                     if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
>  retry:
> -                     res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 
> 1);
> -                     if (res == CS_ERR_TRY_AGAIN) {
> -                             goto retry;
> +                             res = cpg_mcast_joined (handle, 
> CPG_TYPE_AGREED, &iov, 1);
> +                             if (res == CS_ERR_TRY_AGAIN) {
> +                                     goto retry;
> +                             }
>                       }
>               }
>               res = cpg_dispatch (handle, CS_DISPATCH_ALL);
> @@ -154,11 +162,56 @@
>       .length = 6
>  };
>  
> -int main (void) {
> +void *thread_run(void *data)
> +{
> +     int a = 0;
> +     printf("Start background thread\n");
> +     while (1) a++;
> +     return NULL;
> +}
> +
> +void usage()
> +{
> +     printf("Usage: ./cpgbench [-lsgth]\n");
> +     printf(" -l           start in listener mode (no message sent)\n");
> +     printf(" -s <size>    start with message of <size>\n");
> +     printf(" -g <size>    grow each iteration with <size>\n");
> +     printf(" -t <number>  start <number> threads in background\n");
> +     printf("\n");
> +}
> +
> +int main (int argc, char **argv) {
>       cpg_handle_t handle;
> +     char *newptr;
>       unsigned int size = 1;
>       int i;
>       unsigned int res;
> +     int ch;
> +
> +     while ((ch = getopt (argc, argv, "hls:g:t:")) != -1) {
> +             switch (ch) {
> +                     case 'l':
> +                             listen_only = 1;
> +                             break;
> +                     case 's':
> +                             size = atoi(optarg);
> +                             printf("[INFO] set start size to %d\n", size);
> +                             break;
> +                     case 'g':
> +                             size_grow = atoi(optarg);
> +                             printf("[INFO] set grow size to %d\n", 
> size_grow);
> +                             break;
> +                     case 't':
> +                             thread_count = atoi(optarg);
> +                             printf("[INFO] start %d in background\n", 
> thread_count);
> +                             break;
> +                     case 'h':
> +                     case '?':
> +                     default:
> +                             usage();
> +                             return 1;
> +             }
> +     }
>       
>       signal (SIGALRM, sigalrm_handler);
>       res = cpg_initialize (&handle, &callbacks);
> @@ -173,9 +226,24 @@
>               exit (1);
>       }
>  
> +     if ( !listen_only )
> +     {
> +             /* start thread
> +              */
> +             while ( thread_count-- > 0 )
> +                     pthread_create(&thread, NULL, thread_run, NULL);
> +     }
> +
>       for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
> +             newptr = realloc(data, size);
> +             if (newptr == NULL) {
> +                     printf ("cannot realloc data to %d, abort.\n");
> +                     break;
> +             }
> +             data = newptr;
> +
>               cpg_benchmark (handle, size);
> -             size += 1000;
> +             size += size_grow;
>       }
>  
>       res = cpg_finalize (handle);
> @@ -183,5 +251,9 @@
>               printf ("cpg_join failed with result %d\n", res);
>               exit (1);
>       }
> +
> +     if (data != NULL)
> +             free(data);
> +
>       return (0);
>  }

> _______________________________________________
> Openais mailing list
> [email protected]
> https://lists.linux-foundation.org/mailman/listinfo/openais


_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to