Hello,

I'm running a MPI program which uses passive RMA to access shared arrays.

On some systems this program does not work as expected.
When working with several nodes, even though it produces the correct results, 
only the process with rank 0 (the one with the shared arrays on its local 
memory) is actually able to work on the shared arrays, which is an undesired 
behavior.
This has happened with OpenMPI4, in particular with OpenMPI4.0.5 and 
OpenMPI4.1.4.

However, when compiling and running using OpenMPI3 (in particular OpenMPI3.1.4) 
the program works as expected and all processes work on the shared structures.

In addition, when compiling OpenMPI4 to use verbs instead of UCX, the program 
will also works as expected.
Thus, we have concluded that there may be a problem regarding the use of UCX on 
OpenMPI.

About the system where I am working on:

 - Nodes on the system are connected through an InfiniBand FDR network.
 - I'm running g++ (GCC) 8.3.0 and different versions of OpenMPI, as stated 
previously.

I attach a sample code to help to reproduce the undesired behavior.
I also include the output of the test program (1) when behaving unpropertly and 
(2) when behaving propertly.

Can someone help me understand if there's a problem with the program or with 
OpenMPI and UCX?
Thanks a lot!

(1) Output behaving unpropertly:
+--------------------------------------------------+
Rank: 0 ||| Position: 0
Rank: 0 ||| Position: 10000
Rank: 0 ||| Position: 20000
...
Rank: 0 ||| Position: 850000
Rank: 0 ||| Position: 860000
Rank: 0 ||| Position: 870000
...
Rank: 0 ||| Position: 19970000
Rank: 0 ||| Position: 19980000
Rank: 0 ||| Position: 19990000
*****
*****
*****
*****
***** Small correctness check *****
Position 0
      ||| Input value:     0
      ||| Output value:    0.00
      ||| Expected output: 0.00
...
Position 19999999
      ||| Input value:     19999999
      ||| Output value:    49999997.50
      ||| Expected output: 49999997.50
*****
*****
*****
*****
***** Accesses per process data *****
Process 0 accesses: 2000
Process 1 accesses: 0
Process 2 accesses: 0
Process 3 accesses: 0
Process 4 accesses: 0
Process 5 accesses: 0
Process 6 accesses: 0
Process 7 accesses: 0
+--------------------------------------------------+



(2) Output behaving propertly:
+--------------------------------------------------+
Rank: 0 ||| Position: 70000
Rank: 0 ||| Position: 80000
Rank: 0 ||| Position: 90000
...
Rank: 3 ||| Position: 240000
Rank: 4 ||| Position: 280000
Rank: 7 ||| Position: 190000
...
Rank: 3 ||| Position: 19760000
Rank: 2 ||| Position: 19850000
Rank: 6 ||| Position: 19940000
*****
*****
*****
*****
***** Small correctness check *****
Position 0
      ||| Input value:     0
      ||| Output value:    0.00
      ||| Expected output: 0.00
...
Position 19999999
      ||| Input value:     19999999
      ||| Output value:    49999997.50
      ||| Expected output: 49999997.50
*****
*****
*****
*****
***** Accesses per process data *****
Process 0 accesses: 425
Process 1 accesses: 226
Process 2 accesses: 222
Process 3 accesses: 226
Process 4 accesses: 228
Process 5 accesses: 227
Process 6 accesses: 222
Process 7 accesses: 224
+--------------------------------------------------+
#include <iostream>
#include <numeric>
#include <stdexcept>
#include <iomanip>

using std::cout;
using std::endl;

// MPI added
#include <mpi.h>

#define MPI_RANK_0 0

#define MULT_FACTOR 2.5

static void
process_data(int *input_buffer,
            double *output_buffer,
            size_t BLOCK_SIZE){

    for (size_t i = 0; i < BLOCK_SIZE; i++)
    {
        output_buffer[i] = (double) input_buffer[i] * MULT_FACTOR;
    }
    
}


int
main(int argc, char **argv) {

    

    int rank, number_of_processes;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &number_of_processes);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    const size_t VECTOR_SIZE = 20000000;
    const size_t MY_SIZE = rank ? 0 : VECTOR_SIZE;
    int * main_input_buffer;
    double * main_output_buffer;
    

    // Rank 0 has the input data
    if (rank == MPI_RANK_0){
        MPI_Alloc_mem(VECTOR_SIZE * sizeof(int), MPI_INFO_NULL, &main_input_buffer);
        MPI_Alloc_mem(VECTOR_SIZE * sizeof(double), MPI_INFO_NULL, &main_output_buffer);
        for (size_t i = 0; i < VECTOR_SIZE; i++){
            main_input_buffer[i] = (int)i;
        }
    }
    

    // We will create a shared index to access shared data on Rank 0
    // Also, we will share input and output buffers on P0
    size_t * main_buffer_index;
    MPI_Alloc_mem(1 * sizeof(size_t), MPI_INFO_NULL, &main_buffer_index);
    *main_buffer_index = 0;
    MPI_Barrier(MPI_COMM_WORLD);

    MPI_Win index_window, input_window, output_window;

    MPI_Win_create(main_buffer_index, 1 * sizeof(size_t), sizeof(size_t),
                    MPI_INFO_NULL, MPI_COMM_WORLD, &index_window);

    MPI_Win_create(main_input_buffer, 
                    MY_SIZE * sizeof(int), 
                    sizeof(int),
                    MPI_INFO_NULL,
                    MPI_COMM_WORLD,
                    &input_window);

    MPI_Win_create(main_output_buffer, 
                    MY_SIZE * sizeof(double), 
                    sizeof(double),
                    MPI_INFO_NULL,
                    MPI_COMM_WORLD,
                    &output_window);

    // Store info about window access
    int times_acessed = 0;
    int times_accessed_per_process[number_of_processes];

    //********** IDEA OF THE PROGRAM **********//
    // Get an index from shared index window.
    // Read input buffer using that index
    // Process a block of data
    // Write the results back

    const size_t BLOCK_SIZE = 10000;
    size_t * current_position;
    MPI_Alloc_mem(1 * sizeof(size_t), MPI_INFO_NULL, &current_position);

    int * tmp_input_buffer;
    double * tmp_output_buffer;

    MPI_Alloc_mem(BLOCK_SIZE * sizeof(int), MPI_INFO_NULL, &tmp_input_buffer);
    MPI_Alloc_mem(BLOCK_SIZE * sizeof(double), MPI_INFO_NULL, &tmp_output_buffer);

    // Get initial index
    // And increase index 10000 units for the next one to come
    MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, index_window);
    MPI_Fetch_and_op(&BLOCK_SIZE, current_position, MPI_AINT, 
                  MPI_RANK_0, 0, MPI_SUM, index_window);
    MPI_Win_unlock(MPI_RANK_0, index_window);

    while (*current_position < VECTOR_SIZE){
        
        // Get 10000 ints from Rank 0
        MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 
                    0, input_window);
        MPI_Get(tmp_input_buffer, BLOCK_SIZE, MPI_INT, 
                MPI_RANK_0, *current_position, BLOCK_SIZE, 
                MPI_INT, input_window);
        MPI_Win_unlock(MPI_RANK_0, input_window);

        // Process data
        process_data(tmp_input_buffer, tmp_output_buffer, BLOCK_SIZE);

        // Write results to output buffer
        MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, output_window);
        MPI_Put(tmp_output_buffer, BLOCK_SIZE, MPI_DOUBLE, 
                MPI_RANK_0, *current_position, BLOCK_SIZE,
                MPI_DOUBLE, output_window);
        MPI_Win_unlock(MPI_RANK_0, output_window);

        // Print data processed
        cout << "Rank: " << rank << " ||| Position: " << *current_position << endl;
        // Store your access
        times_acessed += 1;

        // Get next position
        MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, index_window);
        MPI_Fetch_and_op(&BLOCK_SIZE, current_position, MPI_AINT, 
                    MPI_RANK_0, 0, MPI_SUM, index_window);
        MPI_Win_unlock(MPI_RANK_0, index_window);
    }

    // When all has been processed, free windows
    MPI_Barrier(MPI_COMM_WORLD);
    MPI_Win_free(&index_window);
    MPI_Win_free(&input_window);
    MPI_Win_free(&output_window);

    // Collect access information
    MPI_Gather(&times_acessed, 1, MPI_INT, times_accessed_per_process, 1, MPI_INT, MPI_RANK_0, MPI_COMM_WORLD);

    // Print first and last positions for correctness
    // Also print access data
    if(rank == MPI_RANK_0){
        cout << "*****" << endl;
        cout << "*****" << endl;
        cout << "*****" << endl;
        cout << "*****" << endl;
        cout << "***** Small correctness check *****" << endl;
        cout << std::fixed;
        cout << std::setprecision(2);

        for(size_t i = 0; i < 10; i++)
            cout << "Position " << i << endl << "\t||| Input value:     " << main_input_buffer[i] << endl << "\t||| Output value:    " << main_output_buffer[i] << endl << "\t||| Expected output: " << (double) main_input_buffer[i] * MULT_FACTOR << endl;

        cout << "..." << endl;

        for(size_t i = VECTOR_SIZE-10; i < VECTOR_SIZE; i++)
            cout << "Position " << i << endl << "\t||| Input value:     " << main_input_buffer[i] << endl << "\t||| Output value:    " << main_output_buffer[i] << endl << "\t||| Expected output: " << (double) main_input_buffer[i] * MULT_FACTOR << endl;

        cout << "*****" << endl;
        cout << "*****" << endl;
        cout << "*****" << endl;
        cout << "*****" << endl;
        cout << "***** Accesses per process data *****" << endl;
        for(int i = 0; i < number_of_processes; i++)
            cout << "Process " << i << " accesses: " << times_accessed_per_process[i] << endl;

    }

    

    // Free memory
    MPI_Free_mem(main_buffer_index);
    MPI_Free_mem(current_position);
    MPI_Free_mem(tmp_input_buffer);
    MPI_Free_mem(tmp_output_buffer);
    if (rank == MPI_RANK_0){
        MPI_Free_mem(main_input_buffer);
        MPI_Free_mem(main_output_buffer);
    }
    
    MPI_Finalize();

    return EXIT_SUCCESS;
    
}

Reply via email to