OK, I will try to explain a few more things about the shuffling and I have attached only specific excerpts of the code to avoid confusion. I have added many comments.
First, let me note that this project is an implementation of the Terasort benchmark with a master node which assigns jobs to the slaves and communicates with them after each phase to get measurements. The file shuffle_before.cc shows how I am doing the shuffling up to now and the shuffle_after.cc the progress I made so far switching to Allgatherv(). I have also included the code that measures time and data size since it's crucial for me to check if I have rate speedup. Some questions I have are: 1. At shuffle_after.cc:61 why do we reserve *comm.Get_size() *entries for* recv_counts* and not *comm.Get_size()-1 *? For example if I am rank k what is the point of *recv_counts[k-1]*? I guess that rank k also receives data from himself but we can ignore it, right? 2. My next concern is about the structure of the buffer *recv_buf[]*. The documentation says that the data is stored there ordered. So I assume that it's stored as segments of char* ordered by rank and the way to distinguish them is to chop the whole data based on *recv_counts[]*. So let G = {g1, g2, ..., gN} a group that exchanges data. Let's take slave g2: Then segment *recv_buf[0 until **recv_counts[0]-1**] *is what g2 received from g1, *recv_buf[**recv_counts[0] until **recv_counts[1]-1**] *is what g2 received from himself (ignore it), and so on... Is this idea correct? So I have written a sketch of the code at shuffle_after.cc which I also try to explain how the master will compute rate, but at least I want to make it work. I know that this discussion is getting long but if you have some free time can you take a look at it? Thanks, Kostas On Tue, Nov 7, 2017 at 9:34 AM, George Bosilca <bosi...@icl.utk.edu> wrote: > If each process send a different amount of data, then the operation should > be an allgatherv. This also requires that you know the amount each process > will send, so you will need a allgather. Schematically the code should look > like the following: > > long bytes_send_count = endata.size * sizeof(long); // compute the amount > of data sent by this process > long* recv_counts = (long*)malloc(comm_size * sizeof(long)); // allocate > buffer to receive the amounts from all peers > int displs = (int*)malloc(comm_size * sizeof(int)); // allocate buffer to > compute the displacements for each peer > MPI_Allgather( &bytes_send_count, 1, MPI_LONG, recv_counts, 1, MPI_LONG, > comm); // exchange the amount of sent data > long total = 0; // we need a total amount of data to be received > for( int i = 0; i < comm_size; i++) { > displs[i] = total; // update the displacements > total += recv_counts[i]; // and the total count > } > char* recv_buf = (char*)malloc(total * sizeof(char)); // prepare buffer > for the allgatherv > MPI_Allgatherv( &(endata.data), endata.size*sizeof(char), > MPI_UNSIGNED_CHAR, recv_buf, recv_counts, displs, MPI_UNSIGNED_CHAR, comm); > > George. > > > > On Tue, Nov 7, 2017 at 4:23 AM, Konstantinos Konstantinidis < > kostas1...@gmail.com> wrote: > >> OK, I started implementing the above Allgather() idea without success >> (segmentation fault). So I will post the problematic lines hare: >> >> * comm.Allgather(&(endata.size), 1, MPI::UNSIGNED_LONG_LONG, >> &(endata_rcv.size), 1, MPI::UNSIGNED_LONG_LONG);* >> * endata_rcv.data = new unsigned char[endata_rcv.size*lineSize];* >> * comm.Allgather(&(endata.data), endata.size*lineSize, >> MPI::UNSIGNED_CHAR, &(endata_rcv.data), endata_rcv.size*lineSize, >> MPI::UNSIGNED_CHAR);* >> * delete [] endata.data;* >> >> The idea (as it was also for the broadcasts) is first to transmit the >> data size as an unsigned long long integer, so that the receivers will >> reserve the required memory for the actual data to be transmitted after >> that. To my understanding, the problem is that each broadcasted data, let >> D(s,G), as I explained in the previous email is not only different but also >> has different size (in general). That's because if I replace the 3rd line >> with >> >> * comm.Allgather(&(endata.data), 1, MPI::UNSIGNED_CHAR, >> &(endata_rcv.data), 1, MPI::UNSIGNED_CHAR);* >> >> seems to work without seg. fault but this is pointless for me since I >> don't want only 1 char to be transmitted. So if we see the previous image I >> posted, imagine that the red, green and blue squares are different in size? >> Can Allgather() even work then? If no, do you suggest anything else or I am >> trapped in using the MPI_Bcast() as shown in Option 1? >> >> On Mon, Nov 6, 2017 at 8:58 AM, George Bosilca <bosi...@icl.utk.edu> >> wrote: >> >>> On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis < >>> kostas1...@gmail.com> wrote: >>> >>>> Hi George, >>>> >>>> First, let me note that the cost of q^(k-1)]*(q-1) communicators was >>>> fine for the values of parameters q,k I am working with. Also, the whole >>>> point of speeding up the shuffling phase is trying to reduce this number >>>> even more (compared to already known implementations) which is a major >>>> concern of my project. But thanks for pointing that out. Btw, do you know >>>> what is the maximum such number in MPI? >>>> >>> >>> Last time I run into such troubles these limits were: 2k for MVAPICH, >>> 16k for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers). >>> It might have changed meanwhile. >>> >>> >>>> Now to the main part of the question, let me clarify that I have 1 >>>> process per machine. I don't know if this is important here but my way of >>>> thinking is that we have a big text file and each process will have to work >>>> on some chunks of it (like chapters of a book). But each process resides in >>>> an machine with some RAM which is able to handle a specific amount of work >>>> so if you generate many processes per machine you must have fewer book >>>> chapters per process than before. Thus, I wanted to avoid thinking in the >>>> process-level rather than machine-level with the RAM limitations. >>>> >>>> Now to the actual shuffling, here is what I am currently doing (Option >>>> 1): >>>> >>>> Let's denote the data that slave s has to send to the slaves in group G >>>> as D(s,G). >>>> >>>> *for each slave s in 1,2,...,K{* >>>> >>>> * for each group G that s participates into{* >>>> >>>> * if (my rank is s){* >>>> * MPI_Bcast(send data D(s,G))* >>>> * }else if(my rank is in group G)* >>>> * MPI_Bcast(get data D(s,G))* >>>> * }else{* >>>> * Do nothing* >>>> * }* >>>> >>>> * }* >>>> >>>> *MPI::COMM_WORLD.Barrier();* >>>> >>>> *}* >>>> >>>> What I suggested before to speedup things (Option 2) is: >>>> >>>> *for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ * >>>> >>>> * for each slave s in G(1)* >>>> * if (my rank is s){* >>>> * MPI_Bcast(send data D(s,G(1)))* >>>> * }else if(**my rank is in** group G(1))* >>>> * MPI_Bcast(get data D(s,G(1)))* >>>> * }else{* >>>> * Do nothing* >>>> * }* >>>> * }* >>>> >>>> * for each slave s in G(2)* >>>> * if (my rank is s){* >>>> * MPI_Bcast(send data D(s,G(2)))* >>>> * }else if(**my rank is in** G(2))* >>>> * MPI_Bcast(get data D(s,G(2)))* >>>> * }else{* >>>> * Do nothing* >>>> * }* >>>> * }* >>>> >>>> * ...* >>>> >>>> *for each slave s in G(q-1)* >>>> * if (my rank is s){* >>>> * MPI_Bcast(send data D(s,G(q-1)))* >>>> * }else if(**my rank is in** G(q-1))* >>>> * MPI_Bcast(get data D(s,G(q-1)))* >>>> * }else{* >>>> * Do nothing* >>>> * }* >>>> * }* >>>> >>>> * MPI::COMM_WORLD.Barrier();* >>>> >>>> *}* >>>> >>>> My hope was that I could implement Option 2 (in some way without >>>> copying and pasting the same code q-1 times every time I change q) and that >>>> this could bring a speedup of q-1 compared to Option 1 by having these >>>> groups communicate in parallel. Right, now I am trying to find a way to >>>> identify these sets of groups based on my implementation, which involves >>>> some abstract algebra but for now let's assume that I can find them in an >>>> efficient manner. >>>> >>>> Let me emphasize that each broadcast sends different actual data. There >>>> are no two broadcasts that send the same D(s,G). >>>> >>>> Finally, let's go to MPI_Allgather(): I am really confused since I have >>>> never used this call but I have this image in my mind: >>>> >>>> >>>> >>> If every member of a group does a bcast to all other members of the same >>> group, then this operation is better realized by an allgather. The picture >>> you attached clearly expose the data movement pattern where each color box >>> gets distributed to all members of the same communicator. You could also >>> see this operation as a loop of bcast where the iterator goes over all >>> members of the communicator and use it as a root. >>> >>> >>>> >>>> I am not sure what you meant but now I am thinking of this (let commG >>>> be the intra-communicator of group G): >>>> >>>> *for each possible group G{* >>>> >>>> *if (my rank is in G){* >>>> * commG.MPI_AllGather(**send data D(rank,G)**)* >>>> * }**else{* >>>> * Do nothing* >>>> * }* >>>> >>>> *MPI::COMM_WORLD.Barrier();* >>>> >>>> *}* >>>> >>> >>> This is indeed what I was thinking about, with the condition that you >>> make sure the list of communicators in G is ordered in the same way on all >>> processes. >>> >>> That being said, this communication pattern 1) generated a large barrier >>> in your code; 2) as all processes will potentially be involved in many >>> collective communications you will be hammering the network in a >>> significant way (so you will have to take into account the network >>> congestion); and 3) all processes need to have all memory for receive >>> allocated for the buffers. Thus, even be implementing a nice communication >>> scheme you might encounter some performance issues. >>> >>> Another way to do this is to instead of conglomerating all >>> communications in a single temporal location you spread them out across >>> time by imposing your own communication logic. This basically translate a >>> set of blocking collective (bcast is a perfect target) into a pipelined >>> mix. Instead of describing such a scheme here I suggest you read the >>> algorithmic description of the SUMMA and/or PUMMA distributed matrix >>> multiplication. >>> >>> George. >>> >>> >>> I am not sure whether this makes sense since I am confused about the >>>> correspodence of the data transmitted with Allgather() compared to the >>>> notation D(s,G) I am currently using. >>>> >>>> Thanks. >>>> >>>> >>>> On Tue, Oct 31, 2017 at 11:11 PM, George Bosilca <bosi...@icl.utk.edu> >>>> wrote: >>>> >>>>> It really depends what are you trying to achieve. If the question is >>>>> rhetorical: "can I write a code that does in parallel broadcasts on >>>>> independent groups of processes ?" then the answer is yes, this is >>>>> certainly possible. If however you add a hint of practicality in your >>>>> question "can I write an efficient parallel broadcast between independent >>>>> groups of processes?" then I'm afraid the answer will be a negative one. >>>>> >>>>> Let's not look at how you can write the multiple bcast code as the >>>>> answer in the stackoverflow is correct, but instead look at what resources >>>>> these collective operations are using. In general you can assume that >>>>> nodes >>>>> are connected by a network, able to move data at a rate B in both >>>>> directions (full duplex). Assuming the implementation of the bcast >>>>> algorithm is not entirely moronic, the bcast can saturate the network with >>>>> a single process per node. Now, if you have multiple processes per node >>>>> (P) >>>>> then either you schedule them sequentially (so that each one has the full >>>>> bandwidth B) or you let them progress in parallel in which case each >>>>> participating process can claim a lower bandwidth B/P (as it is shared >>>>> between all processes on the nore). >>>>> >>>>> So even if you are able to expose enough parallelism, physical >>>>> resources will impose the real hard limit. >>>>> >>>>> That being said I have the impression you are trying to implement an >>>>> MPI_Allgather(v) using a series of MPI_Bcast. Is that true ? >>>>> >>>>> George. >>>>> >>>>> PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1) >>>>> communicator might be prohibitive; the MPI library might support a limited >>>>> number of communicators. >>>>> >>>>> >>>>> On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis < >>>>> kostas1...@gmail.com> wrote: >>>>> >>>>>> Assume that we have K=q*k nodes (slaves) where q,k are positive >>>>>> integers >= 2. >>>>>> >>>>>> Based on the scheme that I am currently using I create >>>>>> [q^(k-1)]*(q-1) groups (along with their communicators). Each group >>>>>> consists of k nodes and within each group exactly k broadcasts take place >>>>>> (each node broadcasts something to the rest of them). So in total >>>>>> [q^(k-1)]*(q-1)*k MPI broadcasts take place. Let me skip the details of >>>>>> the >>>>>> above scheme. >>>>>> >>>>>> Now theoretically I figured out that there are q-1 groups that can >>>>>> communicate in parallel at the same time i.e. groups that have no common >>>>>> nodes and I would like to utilize that to speedup the shuffling. I have >>>>>> seen here https://stackoverflow.com/questions/11372012/mpi-severa >>>>>> l-broadcast-at-the-same-time that this is possible in MPI. >>>>>> >>>>>> In my case it's more complicated since q,k are parameters of the >>>>>> problem and change between different experiments. If I get the idea about >>>>>> the 2nd method that is proposed there and assume that we have only 3 >>>>>> groups >>>>>> within which some communication takes places one can simply do: >>>>>> >>>>>> *if my rank belongs to group 1{* >>>>>> * comm1.Bcast(..., ..., ..., rootId);* >>>>>> *}else if my rank belongs to group 2{* >>>>>> * comm2.Bcast(..., ..., ..., rootId);* >>>>>> *}else if my rank belongs to group3{* >>>>>> * comm3.Bcast(..., ..., ..., rootId);* >>>>>> *} * >>>>>> >>>>>> where comm1, comm2, comm3 are the corresponding sub-communicators >>>>>> that contain only the members of each group. >>>>>> >>>>>> But how can I generalize the above idea to arbitrary number of groups >>>>>> or perhaps do something else? >>>>>> >>>>>> The code is in C++ and the MPI installed is described in the attached >>>>>> file. >>>>>> >>>>>> Regards, >>>>>> Kostas >>>>>> >>>>>> >>>>>> _______________________________________________ >>>>>> users mailing list >>>>>> users@lists.open-mpi.org >>>>>> https://lists.open-mpi.org/mailman/listinfo/users >>>>>> >>>>> >>>>> >>>> >>> >> >
shuffle_after.cc
Description: Binary data
shuffle_before.cc
Description: Binary data
_______________________________________________ users mailing list users@lists.open-mpi.org https://lists.open-mpi.org/mailman/listinfo/users