Hi again,

As promised, I implemented a small program reproducing the error.

The program's main routine spawns a pthread which calls the function
"exchange". "exchange" uses MPI_Isend/MPI_Irecv/MPI_Waitany to exchange
a buffer of double-precision numbers with all other nodes.

At the same time, the "main" routine exchanges the sum of all the
buffers using MPI_Allreduce.

To compile and run the program, do the following:

        mpicc -g -Wall mpitest.c -pthread
        mpirun -np 8 ./a.out

Timing is, of course, of the essence and you may have to run the program
a few times or twiddle with the value of "usleep" in line 146 for it to
hang. To see where things go bad, you can do the following

        mpirun -np 8 xterm -e gdb -ex run ./a.out

Things go bad when MPI_Allreduce is called while any of the threads are
in MPI_Waitany. The value of "usleep" in line 146 should be long enough
for all the nodes to have started exchanging data but small enough so
that they are not done yet.

Cheers,
Pedro



On Thu, 2011-10-20 at 11:25 +0100, Pedro Gonnet wrote:
> Short update:
> 
> I just installed version 1.4.4 from source (compiled with
> --enable-mpi-threads), and the problem persists.
> 
> I should also point out that if, in thread (ii), I wait for the
> nonblocking communication in thread (i) to finish, nothing bad happens.
> But this makes the nonblocking communication somewhat pointless.
> 
> Cheers,
> Pedro
> 
> 
> On Thu, 2011-10-20 at 10:42 +0100, Pedro Gonnet wrote:
> > Hi all,
> > 
> > I am currently working on a multi-threaded hybrid parallel simulation
> > which uses both pthreads and OpenMPI. The simulation uses several
> > pthreads per MPI node.
> > 
> > My code uses the nonblocking routines MPI_Isend/MPI_Irecv/MPI_Waitany
> > quite successfully to implement the node-to-node communication. When I
> > try to interleave other computations during this communication, however,
> > bad things happen.
> > 
> > I have two MPI nodes with two threads each: one thread (i) doing the
> > nonblocking communication and the other (ii) doing other computations.
> > At some point, the threads (ii) need to exchange data using
> > MPI_Allreduce, which fails if the first thread (i) has not completed all
> > the communication, i.e. if thread (i) is still in MPI_Waitany.
> > 
> > Using the in-place MPI_Allreduce, I get a re-run of this bug:
> > http://www.open-mpi.org/community/lists/users/2011/09/17432.php. If I
> > don't use in-place, the call to MPI_Waitany (thread ii) on one of the
> > MPI nodes waits forever. 
> > 
> > My guess is that when the thread (ii) calls MPI_Allreduce, it gets
> > whatever the other node sent with MPI_Isend to thread (i), drops
> > whatever it should have been getting from the other node's
> > MPI_Allreduce, and the call to MPI_Waitall hangs.
> > 
> > Is this a known issue? Is MPI_Allreduce not designed to work alongside
> > the nonblocking routines? Is there a "safe" variant of MPI_Allreduce I
> > should be using instead?
> > 
> > I am using OpenMPI version 1.4.3 (version 1.4.3-1ubuntu3 of the package
> > openmpi-bin in Ubuntu). Both MPI nodes are run on the same dual-core
> > computer (Lenovo x201 laptop).
> > 
> > If you need more information, please do let me know! I'll also try to
> > cook-up a small program reproducing this problem...
> > 
> > Cheers and kind regards,
> > Pedro
> > 
> > 
> > 
> > 
> 

/* Include standard headers. */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <mpi.h>


/* Global variables. */
#define N 1000
int myrank, nr_nodes;
double *buff;


/* This function sends data to other nodes with isend/irecv. */
void *exchange ( void *dummy ) {

    int res, ind, k;
    MPI_Request reqs_send[ nr_nodes ], reqs_recv[ nr_nodes ];
    double sum = 0.0, *buff_recv[ nr_nodes ];

    /* Set dummy requests for myself. */
    reqs_send[myrank] = MPI_REQUEST_NULL;
    reqs_recv[myrank] = MPI_REQUEST_NULL;
    buff_recv[myrank] = NULL;

    /* Loop over the other processors and submit the sends and recvs. */
    for ( k = 0 ; k < nr_nodes ; k++ ) {

        /* Skip myself. */
        if ( k == myrank )
            continue;

        /* Ship my data to the kth node. */
        if ( ( res = MPI_Isend( buff , N , MPI_DOUBLE_PRECISION , k , myrank , MPI_COMM_WORLD , &reqs_send[k] ) ) != MPI_SUCCESS ) {
            printf( "exchange[%02i]: MPI_Isend to node %i failed with error %i.\n" , myrank , k , res );
            abort();
            }
        // printf( "exchange[%02i]: sent data to node %i...\n" , myrank , k ); fflush(stdout);

        /* Allocate the recv buffer. */
        if ( ( buff_recv[k] = malloc( sizeof(double) * N ) ) == NULL ) {
            printf( "exchange[%02i]: call to malloc failed.\n" , myrank );
            abort();
            }

        /* Register the recv from node k. */
        if ( ( res = MPI_Irecv( buff_recv[k] , N , MPI_DOUBLE_PRECISION , k , k , MPI_COMM_WORLD , &reqs_recv[k] ) ) != MPI_SUCCESS ) {
            printf( "exchange[%02i]: MPI_Irecv to from %i failed with error %i.\n" , myrank , k , res );
            abort();
            }
        // printf( "exchange[%02i]: registered recv from node %i...\n" , myrank , k ); fflush(stdout);

        }

    /* Collect the sum of our own values. */
    for ( k = 0 ; k < N ; k++ )
        sum += buff[k];

    /* Now wait for the recvs to come in. */
    while ( 1 ) {

        /* Wait for any request to come in. */
        if ( ( res = MPI_Waitany( nr_nodes , reqs_recv , &ind , MPI_STATUS_IGNORE ) ) != MPI_SUCCESS ) {
            printf( "exchange[%02i]: MPI_Waitany failed with error %i.\n" , myrank , res );
            abort();
            }

        /* Are we done? */
        if ( ind == MPI_UNDEFINED )
            break;

        /* Acknowledge receiving the data. */
        printf( "exchange[%02i]: recvd data from node %i.\n" , myrank , ind );
        fflush(stdout);

        /* Collect these values in the sum. */
        for ( k = 0 ; k < N ; k++ )
            sum += buff_recv[ind][k];

        }

    /* Wait for all the sends to come in. */
    if ( ( res = MPI_Waitall( nr_nodes , reqs_send , MPI_STATUSES_IGNORE ) ) != MPI_SUCCESS ) {
        printf( "exchange[%02i]: MPI_Waitall failed with error %i.\n" , myrank , res );
        abort();
        }

    /* De-allocate the recv buffers. */
    for ( k = 0 ; k < nr_nodes ; k++ )
        if ( buff_recv[k] != NULL )
            free( buff_recv[k] );

    /* We're all done here! */
    printf( "exchange[%02i]: sum=%e, done!\n" , myrank , sum );
    fflush(stdout);

    /* Give nothing back. */
    return NULL;

    }


/* The main routine. */

int main ( int argc , char *argv[] ) {

    int res, k;
    double sum = 0.0, buff_in;
    pthread_t thread;

    /* Start by initializing MPI. */
    if ( ( res = MPI_Init( &argc , &argv ) ) != MPI_SUCCESS ) {
        printf( "main: call to MPI_Init failed with error %i.\n" , res );
        abort();
        }
    if ( ( res = MPI_Comm_rank( MPI_COMM_WORLD , &myrank ) ) != MPI_SUCCESS ) {
        printf( "main: call to MPI_Comm_rank failed with error %i.\n" , res );
        abort();
        }
    if ( ( res = MPI_Comm_size( MPI_COMM_WORLD , &nr_nodes ) != MPI_SUCCESS ) ) {
        printf("main[%02i]: MPI_Comm_size failed with error %i.\n",myrank,res);
        abort();
        }
    if ( myrank == 0 ) {
        printf( "main[%02i]: MPI is up and running...\n" , myrank );
        fflush(stdout);
        }

    /* Allocate and fill the buffer. */
    if ( ( buff = malloc( sizeof(double) * N ) ) == NULL ) {
        printf( "main[%02i]: call to malloc failed.\n" , myrank );
        abort();
        }
    for ( k = 0 ; k < N ; k++ )
        buff[k] = ( 2.0*rand() ) / RAND_MAX - 1.0;

    /* Start a new thread for the nonblocking exchange. */
    if ( ( res = pthread_create( &thread , NULL , &exchange , NULL ) ) != 0 ) {
        printf( "main[%02i]: call to pthread_create failed with err=%i.\n" , myrank , res );
        abort();
        }

    /* Take a little nap. */
    usleep( 10000 );

    /* Compute the local sum of the buff. */
    srand( rand() + myrank );
    for ( k = 0 ; k < N ; k++ )
        sum += buff[k];

    /* Do an MPI_Allreduce to get the sum over all nodes. */
    if ( ( res = MPI_Allreduce( &sum , &buff_in , 1 , MPI_DOUBLE_PRECISION , MPI_SUM , MPI_COMM_WORLD ) ) != MPI_SUCCESS ) {
        printf( "main[%02i]: call to MPI_Allreduce failed with err=%i.\n" , myrank , res );
        abort();
        }
    sum = buff_in;

    /* We're all done here! */
    printf( "main[%02i]: sum=%e, done!\n" , myrank , sum );
    fflush(stdout);

    /* Wait for the thread to terminate. */
    if ( ( res = pthread_join( thread , NULL ) ) != 0 ) {
        printf( "main[%02i]: call to pthread_join failed with err=%i.\n" , myrank , res );
        abort();
        }

    /* Tell MPI we're done. */
    if ( ( res = MPI_Finalize() ) != MPI_SUCCESS ) {
        printf( "main[%i]: call to MPI_Finalize failed with error %i.\n" , myrank , res );
        abort();
        }

    /* Leave quietly. */
    return 0;

    }



Reply via email to