OK, I will try to explain a few more things about the shuffling and I have
attached only specific excerpts of the code to avoid confusion. I have
added many comments.

First, let me note that this project is an implementation of the Terasort
benchmark with a master node which assigns jobs to the slaves and
communicates with them after each phase to get measurements.

The file shuffle_before.cc shows how I am doing the shuffling up to now and
the shuffle_after.cc the progress I made so far switching to Allgatherv().

I have also included the code that measures time and data size since it's
crucial for me to check if I have rate speedup.

Some questions I have are:
1. At shuffle_after.cc:61 why do we reserve *comm.Get_size() *entries for*
recv_counts* and not *comm.Get_size()-1 *? For example if I am rank k what
is the point of *recv_counts[k-1]*? I guess that rank k also receives data
from himself but we can ignore it, right?

2. My next concern is about the structure of the buffer *recv_buf[]*. The
documentation says that the data is stored there ordered. So I assume that
it's stored as segments of char* ordered by rank and the way to distinguish
them is to chop the whole data based on *recv_counts[]*. So let G = {g1,
g2, ..., gN} a group that exchanges data. Let's take slave g2: Then
segment *recv_buf[0
until **recv_counts[0]-1**] *is what g2 received from g1,
until **recv_counts[1]-1**] *is what g2 received from himself (ignore it),
and so on... Is this idea correct?

So I have written a sketch of the code at shuffle_after.cc which I also try
to explain how the master will compute rate, but at least I want to make it

I know that this discussion is getting long but if you have some free time
can you take a look at it?


On Tue, Nov 7, 2017 at 9:34 AM, George Bosilca <bosi...@icl.utk.edu> wrote:

> If each process send a different amount of data, then the operation should
> be an allgatherv. This also requires that you know the amount each process
> will send, so you will need a allgather. Schematically the code should look
> like the following:
> long bytes_send_count = endata.size * sizeof(long);  // compute the amount
> of data sent by this process
> long* recv_counts = (long*)malloc(comm_size * sizeof(long));  // allocate
> buffer to receive the amounts from all peers
> int displs = (int*)malloc(comm_size * sizeof(int));  // allocate buffer to
> compute the displacements for each peer
> MPI_Allgather( &bytes_send_count, 1, MPI_LONG, recv_counts, 1, MPI_LONG,
> comm);  // exchange the amount of sent data
> long total = 0;  // we need a total amount of data to be received
> for( int i = 0; i < comm_size; i++) {
>     displs[i] = total;  // update the displacements
>     total += recv_counts[i];   // and the total count
> }
> char* recv_buf = (char*)malloc(total * sizeof(char));  // prepare buffer
> for the allgatherv
> MPI_Allgatherv( &(endata.data), endata.size*sizeof(char),
> MPI_UNSIGNED_CHAR, recv_buf, recv_counts, displs, MPI_UNSIGNED_CHAR, comm);
> George.
> On Tue, Nov 7, 2017 at 4:23 AM, Konstantinos Konstantinidis <
> kostas1...@gmail.com> wrote:
>> OK, I started implementing the above Allgather() idea without success
>> (segmentation fault). So I will post the problematic lines hare:
>> * comm.Allgather(&(endata.size), 1, MPI::UNSIGNED_LONG_LONG,
>> &(endata_rcv.size), 1, MPI::UNSIGNED_LONG_LONG);*
>> * endata_rcv.data = new unsigned char[endata_rcv.size*lineSize];*
>> * comm.Allgather(&(endata.data), endata.size*lineSize,
>> MPI::UNSIGNED_CHAR, &(endata_rcv.data), endata_rcv.size*lineSize,
>> * delete [] endata.data;*
>> The idea (as it was also for the broadcasts) is first to transmit the
>> data size as an unsigned long long integer, so that the receivers will
>> reserve the required memory for the actual data to be transmitted after
>> that. To my understanding, the problem is that each broadcasted data, let
>> D(s,G), as I explained in the previous email is not only different but also
>> has different size (in general). That's because if I replace the 3rd line
>> with
>> * comm.Allgather(&(endata.data), 1, MPI::UNSIGNED_CHAR,
>> &(endata_rcv.data), 1, MPI::UNSIGNED_CHAR);*
>> seems to work without seg. fault but this is pointless for me since I
>> don't want only 1 char to be transmitted. So if we see the previous image I
>> posted, imagine that the red, green and blue squares are different in size?
>> Can Allgather() even work then? If no, do you suggest anything else or I am
>> trapped in using the MPI_Bcast() as shown in Option 1?
>> On Mon, Nov 6, 2017 at 8:58 AM, George Bosilca <bosi...@icl.utk.edu>
>> wrote:
>>> On Sun, Nov 5, 2017 at 10:23 PM, Konstantinos Konstantinidis <
>>> kostas1...@gmail.com> wrote:
>>>> Hi George,
>>>> First, let me note that the cost of q^(k-1)]*(q-1) communicators was
>>>> fine for the values of parameters q,k I am working with. Also, the whole
>>>> point of speeding up the shuffling phase is trying to reduce this number
>>>> even more (compared to already known implementations) which is a major
>>>> concern of my project. But thanks for pointing that out. Btw, do you know
>>>> what is the maximum such number in MPI?
>>> Last time I run into such troubles these limits were: 2k for MVAPICH,
>>> 16k for MPICH and 2^30-1 for OMPI (all positive signed 23 bits integers).
>>> It might have changed meanwhile.
>>>> Now to the main part of the question, let me clarify that I have 1
>>>> process per machine. I don't know if this is important here but my way of
>>>> thinking is that we have a big text file and each process will have to work
>>>> on some chunks of it (like chapters of a book). But each process resides in
>>>> an machine with some RAM which is able to handle a specific amount of work
>>>> so if you generate many processes per machine you must have fewer book
>>>> chapters per process than before. Thus, I wanted to avoid thinking in the
>>>> process-level rather than machine-level with the RAM limitations.
>>>> Now to the actual shuffling, here is what I am currently doing (Option
>>>> 1):
>>>> Let's denote the data that slave s has to send to the slaves in group G
>>>> as D(s,G).
>>>> *for each slave s in 1,2,...,K{*
>>>> *    for each group G that s participates into{*
>>>> *        if (my rank is s){*
>>>> *            MPI_Bcast(send data D(s,G))*
>>>> *        }else if(my rank is in group G)*
>>>> *            MPI_Bcast(get data D(s,G))*
>>>> *        }else{*
>>>> *           Do nothing*
>>>> *        }*
>>>> *    }*
>>>>     *MPI::COMM_WORLD.Barrier();*
>>>> *}*
>>>> What I suggested before to speedup things (Option 2) is:
>>>> *for each set {G(1),G(2),...,G(q-1)} of q-1 disjoint groups{ *
>>>> *    for each slave s in G(1)*
>>>> *        if (my rank is s){*
>>>> *            MPI_Bcast(send data D(s,G(1)))*
>>>> *        }else if(**my rank is in** group G(1))*
>>>> *            MPI_Bcast(get data D(s,G(1)))*
>>>> *        }else{*
>>>> *           Do nothing*
>>>> *        }*
>>>> *    }*
>>>> *    for each slave s in G(2)*
>>>> *        if (my rank is s){*
>>>> *            MPI_Bcast(send data D(s,G(2)))*
>>>> *        }else if(**my rank is in** G(2))*
>>>> *            MPI_Bcast(get data D(s,G(2)))*
>>>> *        }else{*
>>>> *           Do nothing*
>>>> *        }*
>>>> *    }*
>>>> *    ...*
>>>>     *for each slave s in G(q-1)*
>>>> *        if (my rank is s){*
>>>> *            MPI_Bcast(send data D(s,G(q-1)))*
>>>> *        }else if(**my rank is in** G(q-1))*
>>>> *            MPI_Bcast(get data D(s,G(q-1)))*
>>>> *        }else{*
>>>> *           Do nothing*
>>>> *        }*
>>>> *    }*
>>>> *    MPI::COMM_WORLD.Barrier();*
>>>> *}*
>>>> My hope was that I could implement Option 2 (in some way without
>>>> copying and pasting the same code q-1 times every time I change q) and that
>>>> this could bring a speedup of q-1 compared to Option 1 by having these
>>>> groups communicate in parallel. Right, now I am trying to find a way to
>>>> identify these sets of groups based on my implementation, which involves
>>>> some abstract algebra but for now let's assume that I can find them in an
>>>> efficient manner.
>>>> Let me emphasize that each broadcast sends different actual data. There
>>>> are no two broadcasts that send the same D(s,G).
>>>> Finally, let's go to MPI_Allgather(): I am really confused since I have
>>>> never used this call but I have this image in my mind:
>>> If every member of a group does a bcast to all other members of the same
>>> group, then this operation is better realized by an allgather. The picture
>>> you attached clearly expose the data movement pattern where each color box
>>> gets distributed to all members of the same communicator. You could also
>>> see this operation as a loop of bcast where the iterator goes over all
>>> members of the communicator and use it as a root.
>>>> ​
>>>> I am not sure what you meant but now I am thinking of this (let commG
>>>> be the intra-communicator of group G):
>>>> *for each possible group G{*
>>>>     *if (my rank is in G){*
>>>> *        commG.MPI_AllGather(**send data D(rank,G)**)*
>>>> *    }**else{*
>>>> *        Do nothing*
>>>> *    }*
>>>>     *MPI::COMM_WORLD.Barrier();*
>>>> *}*
>>> This is indeed what I was thinking about, with the condition that you
>>> make sure the list of communicators in G is ordered in the same way on all
>>> processes.
>>> That being said, this communication pattern 1) generated a large barrier
>>> in your code; 2) as all processes will potentially be involved in many
>>> collective communications you will be hammering the network in a
>>> significant way (so you will have to take into account the network
>>> congestion); and 3) all processes need to have all memory for receive
>>> allocated for the buffers. Thus, even be implementing a nice communication
>>> scheme you might encounter some performance issues.
>>> Another way to do this is to instead of conglomerating all
>>> communications in a single temporal location you spread them out across
>>> time by imposing your own communication logic. This basically translate a
>>> set of blocking collective (bcast is a perfect target) into a pipelined
>>> mix. Instead of describing such a scheme here I suggest you read the
>>> algorithmic description of the SUMMA and/or PUMMA distributed matrix
>>> multiplication.
>>>   George.
>>> I am not sure whether this makes sense since I am confused about the
>>>> correspodence of the data transmitted with Allgather() compared to the
>>>> notation D(s,G) I am currently using.
>>>> Thanks.
>>>> On Tue, Oct 31, 2017 at 11:11 PM, George Bosilca <bosi...@icl.utk.edu>
>>>> wrote:
>>>>> It really depends what are you trying to achieve. If the question is
>>>>> rhetorical: "can I write a code that does in parallel broadcasts on
>>>>> independent groups of processes ?" then the answer is yes, this is
>>>>> certainly possible. If however you add a hint of practicality in your
>>>>> question "can I write an efficient parallel broadcast between independent
>>>>> groups of processes?" then I'm afraid the answer will be a negative one.
>>>>> Let's not look at how you can write the multiple bcast code as the
>>>>> answer in the stackoverflow is correct, but instead look at what resources
>>>>> these collective operations are using. In general you can assume that 
>>>>> nodes
>>>>> are connected by a network, able to move data at a rate B in both
>>>>> directions (full duplex). Assuming the implementation of the bcast
>>>>> algorithm is not entirely moronic, the bcast can saturate the network with
>>>>> a single process per node. Now, if you have multiple processes per node 
>>>>> (P)
>>>>> then either you schedule them sequentially (so that each one has the full
>>>>> bandwidth B) or you let them progress in parallel in which case each
>>>>> participating process can claim a lower bandwidth B/P (as it is shared
>>>>> between all processes on the nore).
>>>>> So even if you are able to expose enough parallelism, physical
>>>>> resources will impose the real hard limit.
>>>>> That being said I have the impression you are trying to implement an
>>>>> MPI_Allgather(v) using a series of MPI_Bcast. Is that true ?
>>>>>   George.
>>>>> PS: Few other constraints: the cost of creating the q^(k-1)]*(q-1)
>>>>> communicator might be prohibitive; the MPI library might support a limited
>>>>> number of communicators.
>>>>> On Tue, Oct 31, 2017 at 11:42 PM, Konstantinos Konstantinidis <
>>>>> kostas1...@gmail.com> wrote:
>>>>>> Assume that we have K=q*k nodes (slaves) where q,k are positive
>>>>>> integers >= 2.
>>>>>> Based on the scheme that I am currently using I create
>>>>>> [q^(k-1)]*(q-1) groups (along with their communicators). Each group
>>>>>> consists of k nodes and within each group exactly k broadcasts take place
>>>>>> (each node broadcasts something to the rest of them). So in total
>>>>>> [q^(k-1)]*(q-1)*k MPI broadcasts take place. Let me skip the details of 
>>>>>> the
>>>>>> above scheme.
>>>>>> Now theoretically I figured out that there are q-1 groups that can
>>>>>> communicate in parallel at the same time i.e. groups that have no common
>>>>>> nodes and I would like to utilize that to speedup the shuffling. I have
>>>>>> seen here https://stackoverflow.com/questions/11372012/mpi-severa
>>>>>> l-broadcast-at-the-same-time that this is possible in MPI.
>>>>>> In my case it's more complicated since q,k are parameters of the
>>>>>> problem and change between different experiments. If I get the idea about
>>>>>> the 2nd method that is proposed there and assume that we have only 3 
>>>>>> groups
>>>>>> within which some communication takes places one can simply do:
>>>>>> *if my rank belongs to group 1{*
>>>>>> *    comm1.Bcast(..., ..., ..., rootId);*
>>>>>> *}else if my rank belongs to group 2{*
>>>>>> *    comm2.Bcast(..., ..., ..., rootId);*
>>>>>> *}else if my rank belongs to group3{*
>>>>>> *    comm3.Bcast(..., ..., ..., rootId);*
>>>>>> *} *
>>>>>> where comm1, comm2, comm3 are the corresponding sub-communicators
>>>>>> that contain only the members of each group.
>>>>>> But how can I generalize the above idea to arbitrary number of groups
>>>>>> or perhaps do something else?
>>>>>> The code is in C++ and the MPI installed is described in the attached
>>>>>> file.
>>>>>> Regards,
>>>>>> Kostas
