Hello, I'm running a MPI program which uses passive RMA to access shared arrays.
On some systems this program does not work as expected. When working with several nodes, even though it produces the correct results, only the process with rank 0 (the one with the shared arrays on its local memory) is actually able to work on the shared arrays, which is an undesired behavior. This has happened with OpenMPI4, in particular with OpenMPI4.0.5 and OpenMPI4.1.4. However, when compiling and running using OpenMPI3 (in particular OpenMPI3.1.4) the program works as expected and all processes work on the shared structures. In addition, when compiling OpenMPI4 to use verbs instead of UCX, the program will also works as expected. Thus, we have concluded that there may be a problem regarding the use of UCX on OpenMPI. About the system where I am working on: - Nodes on the system are connected through an InfiniBand FDR network. - I'm running g++ (GCC) 8.3.0 and different versions of OpenMPI, as stated previously. I attach a sample code to help to reproduce the undesired behavior. I also include the output of the test program (1) when behaving unpropertly and (2) when behaving propertly. Can someone help me understand if there's a problem with the program or with OpenMPI and UCX? Thanks a lot! (1) Output behaving unpropertly: +--------------------------------------------------+ Rank: 0 ||| Position: 0 Rank: 0 ||| Position: 10000 Rank: 0 ||| Position: 20000 ... Rank: 0 ||| Position: 850000 Rank: 0 ||| Position: 860000 Rank: 0 ||| Position: 870000 ... Rank: 0 ||| Position: 19970000 Rank: 0 ||| Position: 19980000 Rank: 0 ||| Position: 19990000 ***** ***** ***** ***** ***** Small correctness check ***** Position 0 ||| Input value: 0 ||| Output value: 0.00 ||| Expected output: 0.00 ... Position 19999999 ||| Input value: 19999999 ||| Output value: 49999997.50 ||| Expected output: 49999997.50 ***** ***** ***** ***** ***** Accesses per process data ***** Process 0 accesses: 2000 Process 1 accesses: 0 Process 2 accesses: 0 Process 3 accesses: 0 Process 4 accesses: 0 Process 5 accesses: 0 Process 6 accesses: 0 Process 7 accesses: 0 +--------------------------------------------------+ (2) Output behaving propertly: +--------------------------------------------------+ Rank: 0 ||| Position: 70000 Rank: 0 ||| Position: 80000 Rank: 0 ||| Position: 90000 ... Rank: 3 ||| Position: 240000 Rank: 4 ||| Position: 280000 Rank: 7 ||| Position: 190000 ... Rank: 3 ||| Position: 19760000 Rank: 2 ||| Position: 19850000 Rank: 6 ||| Position: 19940000 ***** ***** ***** ***** ***** Small correctness check ***** Position 0 ||| Input value: 0 ||| Output value: 0.00 ||| Expected output: 0.00 ... Position 19999999 ||| Input value: 19999999 ||| Output value: 49999997.50 ||| Expected output: 49999997.50 ***** ***** ***** ***** ***** Accesses per process data ***** Process 0 accesses: 425 Process 1 accesses: 226 Process 2 accesses: 222 Process 3 accesses: 226 Process 4 accesses: 228 Process 5 accesses: 227 Process 6 accesses: 222 Process 7 accesses: 224 +--------------------------------------------------+
#include <iostream> #include <numeric> #include <stdexcept> #include <iomanip> using std::cout; using std::endl; // MPI added #include <mpi.h> #define MPI_RANK_0 0 #define MULT_FACTOR 2.5 static void process_data(int *input_buffer, double *output_buffer, size_t BLOCK_SIZE){ for (size_t i = 0; i < BLOCK_SIZE; i++) { output_buffer[i] = (double) input_buffer[i] * MULT_FACTOR; } } int main(int argc, char **argv) { int rank, number_of_processes; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &number_of_processes); MPI_Comm_rank(MPI_COMM_WORLD, &rank); const size_t VECTOR_SIZE = 20000000; const size_t MY_SIZE = rank ? 0 : VECTOR_SIZE; int * main_input_buffer; double * main_output_buffer; // Rank 0 has the input data if (rank == MPI_RANK_0){ MPI_Alloc_mem(VECTOR_SIZE * sizeof(int), MPI_INFO_NULL, &main_input_buffer); MPI_Alloc_mem(VECTOR_SIZE * sizeof(double), MPI_INFO_NULL, &main_output_buffer); for (size_t i = 0; i < VECTOR_SIZE; i++){ main_input_buffer[i] = (int)i; } } // We will create a shared index to access shared data on Rank 0 // Also, we will share input and output buffers on P0 size_t * main_buffer_index; MPI_Alloc_mem(1 * sizeof(size_t), MPI_INFO_NULL, &main_buffer_index); *main_buffer_index = 0; MPI_Barrier(MPI_COMM_WORLD); MPI_Win index_window, input_window, output_window; MPI_Win_create(main_buffer_index, 1 * sizeof(size_t), sizeof(size_t), MPI_INFO_NULL, MPI_COMM_WORLD, &index_window); MPI_Win_create(main_input_buffer, MY_SIZE * sizeof(int), sizeof(int), MPI_INFO_NULL, MPI_COMM_WORLD, &input_window); MPI_Win_create(main_output_buffer, MY_SIZE * sizeof(double), sizeof(double), MPI_INFO_NULL, MPI_COMM_WORLD, &output_window); // Store info about window access int times_acessed = 0; int times_accessed_per_process[number_of_processes]; //********** IDEA OF THE PROGRAM **********// // Get an index from shared index window. // Read input buffer using that index // Process a block of data // Write the results back const size_t BLOCK_SIZE = 10000; size_t * current_position; MPI_Alloc_mem(1 * sizeof(size_t), MPI_INFO_NULL, ¤t_position); int * tmp_input_buffer; double * tmp_output_buffer; MPI_Alloc_mem(BLOCK_SIZE * sizeof(int), MPI_INFO_NULL, &tmp_input_buffer); MPI_Alloc_mem(BLOCK_SIZE * sizeof(double), MPI_INFO_NULL, &tmp_output_buffer); // Get initial index // And increase index 10000 units for the next one to come MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, index_window); MPI_Fetch_and_op(&BLOCK_SIZE, current_position, MPI_AINT, MPI_RANK_0, 0, MPI_SUM, index_window); MPI_Win_unlock(MPI_RANK_0, index_window); while (*current_position < VECTOR_SIZE){ // Get 10000 ints from Rank 0 MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, input_window); MPI_Get(tmp_input_buffer, BLOCK_SIZE, MPI_INT, MPI_RANK_0, *current_position, BLOCK_SIZE, MPI_INT, input_window); MPI_Win_unlock(MPI_RANK_0, input_window); // Process data process_data(tmp_input_buffer, tmp_output_buffer, BLOCK_SIZE); // Write results to output buffer MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, output_window); MPI_Put(tmp_output_buffer, BLOCK_SIZE, MPI_DOUBLE, MPI_RANK_0, *current_position, BLOCK_SIZE, MPI_DOUBLE, output_window); MPI_Win_unlock(MPI_RANK_0, output_window); // Print data processed cout << "Rank: " << rank << " ||| Position: " << *current_position << endl; // Store your access times_acessed += 1; // Get next position MPI_Win_lock(MPI_LOCK_SHARED, MPI_RANK_0, 0, index_window); MPI_Fetch_and_op(&BLOCK_SIZE, current_position, MPI_AINT, MPI_RANK_0, 0, MPI_SUM, index_window); MPI_Win_unlock(MPI_RANK_0, index_window); } // When all has been processed, free windows MPI_Barrier(MPI_COMM_WORLD); MPI_Win_free(&index_window); MPI_Win_free(&input_window); MPI_Win_free(&output_window); // Collect access information MPI_Gather(×_acessed, 1, MPI_INT, times_accessed_per_process, 1, MPI_INT, MPI_RANK_0, MPI_COMM_WORLD); // Print first and last positions for correctness // Also print access data if(rank == MPI_RANK_0){ cout << "*****" << endl; cout << "*****" << endl; cout << "*****" << endl; cout << "*****" << endl; cout << "***** Small correctness check *****" << endl; cout << std::fixed; cout << std::setprecision(2); for(size_t i = 0; i < 10; i++) cout << "Position " << i << endl << "\t||| Input value: " << main_input_buffer[i] << endl << "\t||| Output value: " << main_output_buffer[i] << endl << "\t||| Expected output: " << (double) main_input_buffer[i] * MULT_FACTOR << endl; cout << "..." << endl; for(size_t i = VECTOR_SIZE-10; i < VECTOR_SIZE; i++) cout << "Position " << i << endl << "\t||| Input value: " << main_input_buffer[i] << endl << "\t||| Output value: " << main_output_buffer[i] << endl << "\t||| Expected output: " << (double) main_input_buffer[i] * MULT_FACTOR << endl; cout << "*****" << endl; cout << "*****" << endl; cout << "*****" << endl; cout << "*****" << endl; cout << "***** Accesses per process data *****" << endl; for(int i = 0; i < number_of_processes; i++) cout << "Process " << i << " accesses: " << times_accessed_per_process[i] << endl; } // Free memory MPI_Free_mem(main_buffer_index); MPI_Free_mem(current_position); MPI_Free_mem(tmp_input_buffer); MPI_Free_mem(tmp_output_buffer); if (rank == MPI_RANK_0){ MPI_Free_mem(main_input_buffer); MPI_Free_mem(main_output_buffer); } MPI_Finalize(); return EXIT_SUCCESS; }