Hi Loh,

I used MPI_Init_thread(&argc,&argv, MPI_THREAD_MULTIPLE, &provided); in my 
program and got provided = 0 which turns out to be the MPI_THREAD_SINGLE. Does 
this mean that I can not use MPI_THREAD_MULTIPLE model? I write a little 
program to test the multithreading and it worked sometimes and failed 
sometimes. It also hang there sometimes. Does this only because the 
MPI_THREAD_MULTIPLE is not supported or there are some bugs in the program? I 
attached the little program as follow:



#include <iostream>
#include <pthread.h>
#include <fstream>
#include <sstream>
#include <string.h>
#include "mpi.h"
using namespace std;
#define MSG_QUERY_SIZE 16  //sizeof(MPI_query_msg) = 16

struct MPI_query_msg
{
 int flag;   // -1:request cell; 0:query coordinate; 1:there is no cell to grant
 int x;
 int y;
 int ignited;   // if x,y are not negative, then ignited: 0 is not ignited, 1 
is ignited
};

void* backRecv(void* arg)
{
 int myid, nprocs;
 pthread_mutex_init(&_dealmutex2, NULL);
 stringstream RANK;
 MPI_Status status;
 MPI_Request  req2;
 MPI_Comm_rank(MPI_COMM_WORLD, &myid);
 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
 int left = (myid - 1 + nprocs - 1) % (nprocs - 1);
 int right = (myid + 1) % (nprocs - 1);
 MPI_query_msg rMSG;
 RANK << myid;
 cout << myid << " create background message recv" << endl;
 int x, y;
 //char c;
 int m;
 int count = 0;
 string filename("f_");
 filename += RANK.str();
 filename += "_backRecv.txt";
 fstream fout(filename.c_str(), ios::out);
 if(!fout)
 {
  cout << "can not create the file " << filename << endl;
  fout.close();
  exit(1);
 }

 while(true)
 {
  MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 222, 
MPI_COMM_WORLD, &status);
  //MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, 222, 
MPI_COMM_WORLD, &req2);
  //MPI_Wait(&req2, &status);
  fout << "BACKREV:" << myid << " recv from " << status.MPI_SOURCE << " 
rMSG.flag = " << rMSG.flag << " with tag 222" << endl;
  fout.flush();
  if(rMSG.flag == -1)
  {
   fout << "*******backRecv FINISHED IN " << myid << "********" << endl;
   fout.flush();
   fout.close();
   pthread_exit(NULL);
   return 0;
  } 
 };
}

int main(int argc, char **argv) 
{
 int myid = 0;
 int provided;
 int nprocs = 0;
 pthread_t pt1 = 0;
    pthread_t pt2 = 0;;
 int pret1 = 0;
 int pret2 = 0;
 int i = 0, j = 0, m = 0;
 //MPI_Status status;
 MPI_Request  requ1, requ2;
 MPI_Status status1, status2;

 MPI_Init_thread(&argc,&argv, MPI_THREAD_MULTIPLE, &provided);
 //MPI_Init(&argc,&argv);
   MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
   MPI_Comm_rank(MPI_COMM_WORLD,&myid); 
 pthread_mutex_init(&_dealmutex, NULL);

 if(myid == nprocs - 1)  // the last one
 {
  if(provided == MPI_THREAD_MULTIPLE)
  {
   cout << myid << " got MPI_THREAD_MULTIPLE " << endl;
  }
  else
  {
   cout << myid << " MPI_THREAD_MULTIPLE = " << MPI_THREAD_MULTIPLE << endl;
   cout << myid << " MPI_THREAD_SINGLE = " << MPI_THREAD_SINGLE << endl;
   cout << myid << " got provided = " << provided << endl;
  }
  MPI_query_msg sMSGqueue[50], rMSG;
  for(i=0; i<50; i++)
  {
   sMSGqueue[i].flag = i;
   sMSGqueue[i].x = i;
   sMSGqueue[i].y = i;
   sMSGqueue[i].ignited = i;
  }
  while(j < 50)
  {
   MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, 
MPI_COMM_WORLD, &status2);
   //MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, 
MPI_COMM_WORLD, &requ2);
   //MPI_Wait(&requ2, &status2);
   cout << "MAIN(" << j << "): " << myid << " recvs from "<< status2.MPI_SOURCE 
<< " with tag = " << status2.MPI_TAG << " rMSG.flag = " << rMSG.flag << endl;

   MPI_Send(&(sMSGqueue[j]), MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE, 
status2.MPI_TAG, MPI_COMM_WORLD);
   //MPI_Isend(&(sMSGqueue[j]), MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE, 
status2.MPI_TAG, MPI_COMM_WORLD, &requ1);
   //MPI_Wait(&requ1, &status1);
   cout << "MAIN(" << j << "): " << myid << " sends to "<< status2.MPI_SOURCE 
<< " with tag = " << status2.MPI_TAG << " sMSGqueue[j].flag = " << 
sMSGqueue[j].flag << endl;
   j++;
  };
  int count = 0;
  while(true)
  {
   MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, 
MPI_COMM_WORLD, &status2);
   //MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, MPI_ANY_SOURCE, MPI_ANY_TAG, 
MPI_COMM_WORLD, &requ2);
   //MPI_Wait(&requ2, &status2);
   rMSG.flag = -1;
   MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE, 
status2.MPI_TAG, MPI_COMM_WORLD);
   //MPI_Isend(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, status2.MPI_SOURCE, 
status2.MPI_TAG, MPI_COMM_WORLD, &requ1);
   //MPI_Wait(&requ1, &status1);
   cout << "MAIN sends termination to " << status2.MPI_SOURCE << endl;
   count++;
   if(count == myid)
    break;
  };
  cout << "*******************************MAIN SUCESS!" << endl;
 }
 else
 {
  pret1 = pthread_create(&pt1, NULL, backRecv, NULL);
  if(pret1 != 0)
  {
   cout << myid << "backRecv Thread Create Failed." << endl;
   exit(1);
  }
  MPI_query_msg sMSG, rMSG;
  rMSG.flag = myid;
  rMSG.x = myid;
  rMSG.y = myid;
  rMSG.ignited = myid;
  sMSG.flag = myid;
  sMSG.x = myid;
  sMSG.y = myid;
  sMSG.ignited = myid;
  int left = (myid - 1 + nprocs - 1) % (nprocs - 1);
  int right = (myid + 1) % (nprocs - 1);
  while(true)
  {
   MPI_Send(&sMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD);
   //MPI_Isend(&sMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD, 
&requ1);
   //MPI_Wait(&requ1, &status1);
   cout << "SLAVE: " << myid << " sends to "<< nprocs-1 << " sMSG.x = " << 
sMSG.x << endl;
   MPI_Recv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD, 
&status2);
   //MPI_Irecv(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, nprocs-1, myid, MPI_COMM_WORLD, 
&requ2);
   //MPI_Wait(&requ2, &status2);
   cout << "SLAVE: " << myid << " recvs from "<< nprocs-1 << " rMSG.flag = " << 
rMSG.flag << endl;
   MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD);
   //MPI_Isend(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD, 
&requ1);
   //MPI_Wait(&requ1, &status1);

   if(rMSG.flag == -1)
   {
    //MPI_Send(&rMSG, MSG_QUERY_SIZE, MPI_CHAR, right, 222, MPI_COMM_WORLD);
    break;
   }
   for(j=0; j<(myid+1)*300; ++j)
   {}
  };
  cout << "*******************************SLAVE" << myid << " SUCESS!" << endl;
  pthread_join(pt1, NULL);
 }
 MPI_Finalize();
}





BTW, if I want to create a background thread which is sort of like a deamon 
thread, how can I achieve that in MPI programs? Thanks.



List-Post: users@lists.open-mpi.org
Date: Tue, 22 Sep 2009 10:32:50 -0700
From: eugene....@sun.com
To: us...@open-mpi.org
Subject: Re: [OMPI users] How to create multi-thread parallel program using 
thread-safe send and recv?

guosong wrote: 


Thanks for responding. I used a linux cluster. I think I would like to create a 
model that is multithreaded and each thread can make MPI calls. I attached test 
code as follow. It has two pthreads and there are MPI calls in both of those 
two threads. In the main function, there are also MPI calls. Should I use a 
full multithreading?I guess so.  It seems like the created threads are expected 
to make independent/concurrent message-passing calls.  Do read the link I sent. 
 You need to convert from MPI_Init to MPI_Init_thread(), asking for a 
full-multithreaded model and checking that you got it.  Also note in main() 
that the MPI_Isend() calls should be matched with MPI_Wait() or similar calls.  
I guess the parent thread will sit in such calls while the child threads do 
their own message passing.  Good luck.

_________________________________________________________________
约会说不清地方?来试试微软地图最新msn互动功能!
http://ditu.live.com/?form=TL&swm=1

Reply via email to