The machines have 4 cores. The THREADS_DEFAULT corresponds to a limit: the program spawns threads once at a time. So at the beginning, only one thread performs the ping pong test, then a thread is created and the two threads run the ping pong test, then a thread is created and 3 threads run the test, etc.

If your machines have 2 cores, you should set THREADS_DEFAULT to 2, so that the machines are not overloaded.

In my case (machines with 4 cores), THREADS_DEFAULT is set to 16 in order to study the behavior of the MPI implementation in an extreme case. However, TCP seems to handle the concurrency quite fine up to 8 threads. Then, it tends to crash/hang (I guess the problem of overloading cpu has not been studied deeply in OpenMPI)

Francois


Scott Atchley wrote:
Francois,

How many cores do your machines have?

The file specifies THREADS_DEFAULT 16. Does this spawn 16 threads per MPI rank?

I see crashes when I run this with MX (BTL with mx,sm,self and MTL). If I change THREADS_DEFAULT to 4, I see crashes with TCP (BTL with tcp,sm,self) as well.

With THREADS_DEFAULT at 16, TCP seems to hang. I only have 2 cores, which mmay be why. :-)

Scott

On Jun 12, 2009, at 3:09 AM, François Trahay wrote:

Here's the program.
It should print something like that:

[1 communicating threads]
[0]     1       2.484936           0.402           0.384
[0]     2       2.478036           0.807           0.770
[0]     4       2.501503           1.599           1.525
[0]     8       2.497516           3.203           3.055
thread #1
[2 communicating threads]
[0]     1       3.970628           0.252           0.240
[1]     1       3.929280           0.254           0.243
[1]     2       4.087206           0.489           0.467
[0]     2       5.181758           0.386           0.368
[1]     4       3.715222           1.077           1.027
[0]     4       4.358013           0.918           0.875
[1]     8       4.166852           1.920           1.831
[0]     8       3.628287           2.205           2.103
thread #2
[3 communicating threads]
[0]     1       5.922241           0.169           0.161
[2]     1       6.896299           0.145           0.138
[1]     1       8.973834           0.111           0.106
...


Francois

George Bosilca wrote:
I will take a look at the BTL problem. Can you provide a copy of the benchmarks please.

 Thanks,
   george.

On Jun 11, 2009, at 16:05 , François Trahay wrote:

concurrent_ping


_______________________________________________
users mailing list
us...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/users


/*
* NewMadeleine
* Copyright (C) 2006 (see AUTHORS file)
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or (at
* your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
* General Public License for more details.
*/
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include "mpi.h"

#include <semaphore.h>
#include <sched.h>

/* This program performs several ping pong in parallel.
* This evaluates the efficienty to access nmad from 1, 2, 3, ...n threads simultanously
*/

#define LEN_DEFAULT      4
#define WARMUPS_DEFAULT    1000
#define LOOPS_DEFAULT    10000
#define THREADS_DEFAULT 16
#define DATA_CONTROL_ACTIVATED 0

static int    comm_rank    = -1;
static int    comm_size    = -1;
static char    host_name[1024]    = "";

static int       max_len   =  16;
static int     loops;
static int       threads;
static int       warmups;

static sem_t ready_sem;

static int go;

static __inline__
uint32_t _next(uint32_t len, uint32_t multiplier, uint32_t increment)
{
 if (!len)
   return 1+increment;
 else
   return len*multiplier+increment;
}

void usage_ping() {
 fprintf(stderr, "-L len - packet length [%d]\n", LEN_DEFAULT);
 fprintf(stderr, "-N iterations - iterations [%d]\n", LOOPS_DEFAULT);
fprintf(stderr, "-T thread - number of communicating threads [%d]\n", THREADS_DEFAULT); fprintf(stderr, "-W warmup - number of warmup iterations [%d]\n", WARMUPS_DEFAULT);
}

static void fill_buffer(char *buffer, int len) {
 unsigned int i = 0;

 for (i = 0; i < len; i++) {
   buffer[i] = 'a'+(i%26);
 }
}

static void clear_buffer(char *buffer, int len) {
 memset(buffer, 0, len);
}

#if DATA_CONTROL_ACTIVATED
static void control_buffer(char *msg, char *buffer, int len) {
 tbx_bool_t   ok = tbx_true;
 unsigned char expected_char;
 unsigned int          i  = 0;

 for(i = 0; i < len; i++){
   expected_char = 'a'+(i%26);

   if(buffer[i] != expected_char){
     printf("Bad data at byte %d: expected %c, received %c\n",
            i, expected_char, buffer[i]);
     ok = tbx_false;
   }
 }


 if (!ok) {
   printf("Controling %s - ", msg);
   printf("%d bytes reception failed\n", len);

   TBX_FAILURE("data corruption");
 } else {
   printf("ok\n");
 }
}
#endif


void
server(void* arg) {
 int    my_pos = (uint8_t)arg;
 char    *buf    = NULL;
 uint8_t tag   = (uint8_t)arg;
 int    i, k;
 int len;

 buf = malloc(max_len);
 clear_buffer(buf, max_len);
 for(i = my_pos; i <= threads; i++) {
/* Be sure all the communicating threads have been created before we start */
   while(go < i )
     sched_yield();

   for(len=1; len < max_len; len*=2){
     for(k = 0; k < loops + warmups; k++) {

     MPI_Request request;

MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

#if DATA_CONTROL_ACTIVATED
     control_buffer("received", buf, len);
#endif
MPI_Send(buf, len , MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD);

     }
   }

   sem_post(&ready_sem);
 }
}

int
client(void *arg) {
 int        my_pos = (uint8_t)arg;
 uint8_t    tag    = (uint8_t)my_pos;
 char        *buf    = NULL;
 double t1, t2;
 double     sum, lat, bw_million_byte, bw_mbyte;
 int        i, k;
 int len;

 fprintf(stderr, "thread #%d\n", my_pos);
 buf = malloc(max_len);
 clear_buffer(buf, max_len);

 fill_buffer(buf, len);
 for(i = my_pos; i <= threads; i++) {
/* Be sure all the communicating threads have been created before we start */
   while(go < i )
     sched_yield();

   for(len=1; len < max_len; len*=2){
     for(k = 0; k < warmups; k++) {
        MPI_Request request;
#if DATA_CONTROL_ACTIVATED
        control_buffer("sending", buf, len);
#endif
MPI_Send(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD);

MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
#if DATA_CONTROL_ACTIVATED
        control_buffer("received", buf, len);
#endif
     }

     t1= MPI_Wtime();

     for(k = 0; k < loops; k++) {
       MPI_Request request;
 #if DATA_CONTROL_ACTIVATED
       control_buffer("sending", buf, len);
 #endif
MPI_Send(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD); MPI_Recv(buf, len, MPI_CHAR, (comm_rank+1)%2, tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
 #if DATA_CONTROL_ACTIVATED
       control_buffer("received", buf, len);
 #endif
     }

     t2 = MPI_Wtime();

     sum = (t2 - t1)*1e6;

     lat          = sum / (2 * loops);
     bw_million_byte = len * (loops / (sum / 2));
     bw_mbyte        = bw_million_byte / 1.048576;

printf("[%d]\t%d\t%lf\t%8.3f\t%8.3f\n", my_pos, len, lat, bw_million_byte, bw_mbyte);
     fflush(stdout);
   }

   sem_post(&ready_sem);
 }
}
int
main(int      argc,
    char    **argv) {
 int         i, j;
 pthread_t    * pid;
 static sem_t bourrin_ready;
 pthread_attr_t attr;

 //len =        LEN_DEFAULT;
 loops = LOOPS_DEFAULT;
 threads =    THREADS_DEFAULT;
 warmups =    WARMUPS_DEFAULT;

 int provided;
 int needed = MPI_THREAD_MULTIPLE;
 MPI_Init_thread(&argc, &argv, needed, &provided);
 if(provided < needed){
      fprintf(stderr, "needed: %d, provided: %d\n", needed, provided);
      exit(0);
 }
 MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
 MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);


 if (argc > 1 && !strcmp(argv[1], "--help")) {
   usage_ping();
   exit(0);
 }

 for(i=1 ; i<argc ; i+=2) {
   if (!strcmp(argv[i], "-N")) {
     loops = atoi(argv[i+1]);
   }
   else if (!strcmp(argv[i], "-L")) {
        //len = atoi(argv[i+1]);
   }
   else if (!strcmp(argv[i], "-T")) {
     threads = atoi(argv[i+1]);
   }
   else if (!strcmp(argv[i], "-W")) {
     warmups = atoi(argv[i+1]);
   }
   else {
     fprintf(stderr, "Illegal argument %s\n", argv[i]);
     usage_ping();
     exit(0);
   }
 }

 pthread_attr_init(&attr);
 pid = malloc(sizeof(pthread_t) * threads);
 sem_init(&ready_sem, 0, 0);

 go = 0;
 for (i = 0 ; i< threads ; i++) {
   printf("[%d communicating threads]\n", i+1);
   if (comm_rank == 0) {
     pthread_create(&pid[i], &attr, (void*)server, (uint8_t)i);
   } else {
     pthread_create(&pid[i], &attr, (void*)client, (uint8_t)i);
   }

   for( j = 0; j <= i; j++){
sem_wait(&ready_sem); go=j;
   }
   go++;
 }

 for(i=0;i<threads;i++)
   pthread_join(pid[i],NULL);

 MPI_Finalize();
 exit(0);
}
_______________________________________________
users mailing list
us...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/users


_______________________________________________
users mailing list
us...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/users


Reply via email to