What: The current probe algorithm in ob1 is linear with respect to the
number or processes in the job. I wish to change the algorithm to be
linear in the number of processes with unexpected messages. To do this I
added an additional opal_list_t to the ob1 communicator and made the ob1
process a list_item_t. When an unexpected message comes in on a proc it
is added to that proc's unexpected message queue and the proc is added
to the communicator's list of procs with unexpected messages
(unexpected_procs) if it isn't already on that list. When matching a
probe request this list is used to determine which procs to look at to
find an unexpected message. The new list is protected by the matching
lock so no extra locking is needed.

Why: I have a benchmark that makes heavy use of MPI_Iprobe in one if its
phases. I discovered that the primary reason this benchmark was running
slow with Open MPI is the probe algorithm.

When: This is another simple optimization. It only affects the
unexpected message path and will speed up probe requests. This is
intended to go into 1.7.5. Setting the timeout to next Tuesday (which
gives me time to verify the improvment at scale-- 131,000 PEs).

See the attached patch.

-Nathan

diff --git a/ompi/mca/pml/ob1/pml_ob1.c b/ompi/mca/pml/ob1/pml_ob1.c
index bfb975a..4480067 100644
--- a/ompi/mca/pml/ob1/pml_ob1.c
+++ b/ompi/mca/pml/ob1/pml_ob1.c
@@ -192,8 +192,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
 {
     /* allocate pml specific comm data */
     mca_pml_ob1_comm_t* pml_comm = OBJ_NEW(mca_pml_ob1_comm_t);
-    opal_list_item_t *item, *next_item;
-    mca_pml_ob1_recv_frag_t* frag;
+    mca_pml_ob1_recv_frag_t* frag, *next_frag;
     mca_pml_ob1_comm_proc_t* pml_proc;
     mca_pml_ob1_match_hdr_t* hdr;
     int i;
@@ -215,12 +214,9 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
         pml_comm->procs[i].ompi_proc = 
ompi_group_peer_lookup(comm->c_remote_group,i);
         OBJ_RETAIN(pml_comm->procs[i].ompi_proc);
     }
+
     /* Grab all related messages from the non_existing_communicator pending 
queue */
-    for( item = 
opal_list_get_first(&mca_pml_ob1.non_existing_communicator_pending);
-         item != 
opal_list_get_end(&mca_pml_ob1.non_existing_communicator_pending);
-         item = next_item ) {
-        frag = (mca_pml_ob1_recv_frag_t*)item;
-        next_item = opal_list_get_next(item);
+    OPAL_LIST_FOREACH_SAFE(frag, next_frag, 
&mca_pml_ob1.non_existing_communicator_pending, mca_pml_ob1_recv_frag_t) {
         hdr = &frag->hdr.hdr_match;
 
         /* Is this fragment for the current communicator ? */
@@ -231,9 +227,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
          * we should remove it from the
          * non_existing_communicator_pending list. */
         opal_list_remove_item( &mca_pml_ob1.non_existing_communicator_pending, 
-                               item );
-
-      add_fragment_to_unexpected:
+                               (opal_list_item_t *) frag);
 
         /* We generate the MSG_ARRIVED event as soon as the PML is aware
          * of a matching fragment arrival. Independing if it is received
@@ -252,9 +246,15 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
          */
         pml_proc = &(pml_comm->procs[hdr->hdr_src]);
 
+      add_fragment_to_unexpected:
+
         if( ((uint16_t)hdr->hdr_seq) == 
((uint16_t)pml_proc->expected_sequence) ) {
             /* We're now expecting the next sequence number. */
             pml_proc->expected_sequence++;
+            /* add this proc to the list of procs with unexpected messages */
+            if (0 == opal_list_get_size (&pml_proc->unexpected_frags)) {
+                opal_list_append (&pml_comm->unexpected_procs, 
&pml_proc->super);
+            }
             opal_list_append( &pml_proc->unexpected_frags, 
(opal_list_item_t*)frag );
             PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm,
                                    hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
@@ -264,9 +264,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
              * situation as the cant_match is only checked when a new fragment 
is received from
              * the network.
              */
-           for(frag = (mca_pml_ob1_recv_frag_t 
*)opal_list_get_first(&pml_proc->frags_cant_match);
-               frag != (mca_pml_ob1_recv_frag_t 
*)opal_list_get_end(&pml_proc->frags_cant_match);
-               frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_next(frag)) {
+            OPAL_LIST_FOREACH(frag, &pml_proc->frags_cant_match, 
mca_pml_ob1_recv_frag_t) {
                hdr = &frag->hdr.hdr_match;
                /* If the message has the next expected seq from that proc...  
*/
                if(hdr->hdr_seq != pml_proc->expected_sequence)
diff --git a/ompi/mca/pml/ob1/pml_ob1.h b/ompi/mca/pml/ob1/pml_ob1.h
index 5c66580..1a9dd78 100644
--- a/ompi/mca/pml/ob1/pml_ob1.h
+++ b/ompi/mca/pml/ob1/pml_ob1.h
@@ -12,7 +12,7 @@
  *                         All rights reserved.
  * Copyright (c) 2010      Oracle and/or its affiliates.  All rights reserved
  * Copyright (c) 2011      Sandia National Laboratories. All rights reserved.
- * Copyright (c) 2012      Los Alamos National Security, LLC. All rights
+ * Copyright (c) 2012-2014 Los Alamos National Security, LLC. All rights
  *                         reserved.
  * $COPYRIGHT$
  * 
diff --git a/ompi/mca/pml/ob1/pml_ob1_comm.c b/ompi/mca/pml/ob1/pml_ob1_comm.c
index 8c15722..017656d 100644
--- a/ompi/mca/pml/ob1/pml_ob1_comm.c
+++ b/ompi/mca/pml/ob1/pml_ob1_comm.c
@@ -43,9 +43,9 @@ static void 
mca_pml_ob1_comm_proc_destruct(mca_pml_ob1_comm_proc_t* proc)
 }
 
 
-static OBJ_CLASS_INSTANCE(
+OBJ_CLASS_INSTANCE(
     mca_pml_ob1_comm_proc_t,
-    opal_object_t,
+    opal_list_item_t,
     mca_pml_ob1_comm_proc_construct,
     mca_pml_ob1_comm_proc_destruct);
 
@@ -54,6 +54,7 @@ static void mca_pml_ob1_comm_construct(mca_pml_ob1_comm_t* 
comm)
 {
     OBJ_CONSTRUCT(&comm->wild_receives, opal_list_t);
     OBJ_CONSTRUCT(&comm->matching_lock, opal_mutex_t);
+    OBJ_CONSTRUCT(&comm->unexpected_procs, opal_list_t);
     comm->recv_sequence = 0;
     comm->procs = NULL;
     comm->last_probed = 0;
@@ -70,6 +71,7 @@ static void mca_pml_ob1_comm_destruct(mca_pml_ob1_comm_t* 
comm)
         free(comm->procs);
     OBJ_DESTRUCT(&comm->wild_receives);
     OBJ_DESTRUCT(&comm->matching_lock);
+    OBJ_DESTRUCT(&comm->unexpected_procs);
 }
 
 
diff --git a/ompi/mca/pml/ob1/pml_ob1_comm.h b/ompi/mca/pml/ob1/pml_ob1_comm.h
index 84aa323..442db04 100644
--- a/ompi/mca/pml/ob1/pml_ob1_comm.h
+++ b/ompi/mca/pml/ob1/pml_ob1_comm.h
@@ -1,3 +1,4 @@
+/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
 /*
  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
  *                         University Research and Technology
@@ -9,6 +10,8 @@
  *                         University of Stuttgart.  All rights reserved.
  * Copyright (c) 2004-2005 The Regents of the University of California.
  *                         All rights reserved.
+ * Copyright (c) 2014      Los Alamos National Security, LLC. All rights
+ *                         reserved.
  * $COPYRIGHT$
  * 
  * Additional copyrights may follow
@@ -28,7 +31,7 @@ BEGIN_C_DECLS
 
 
 struct mca_pml_ob1_comm_proc_t {
-    opal_object_t super;
+    opal_list_item_t super;        /**< allow this proc to be kept in a list */
     uint16_t expected_sequence;    /**< send message sequence number - 
receiver side */
     struct ompi_proc_t* ompi_proc;
 #if OPAL_ENABLE_MULTI_THREADS
@@ -56,6 +59,7 @@ struct mca_pml_comm_t {
 #endif
     opal_mutex_t matching_lock;   /**< matching lock */
     opal_list_t wild_receives;    /**< queue of unmatched wild (source process 
not specified) receives */
+    opal_list_t unexpected_procs; /**< list of procs with unexpected messages 
*/
     mca_pml_ob1_comm_proc_t* procs;
     size_t num_procs;
     size_t last_probed;
diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c 
b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c
index c8661b7..fe28832 100644
--- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c
+++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c
@@ -546,6 +546,10 @@ match_one(mca_btl_base_module_t *btl,
             return match;
         }
 
+        /* add this proc to the list of procs with unexpected messages */
+        if (0 == opal_list_get_size (&proc->unexpected_frags)) {
+            opal_list_append (&comm->unexpected_procs, &proc->super);
+        }
         /* if no match found, place on unexpected queue */
         append_frag_to_list(&proc->unexpected_frags, btl, hdr, segments,
                             num_segments, frag);
@@ -562,10 +566,7 @@ static mca_pml_ob1_recv_frag_t* 
check_cantmatch_for_match(mca_pml_ob1_comm_proc_
     /* search the list for a fragment from the send with sequence
      * number next_msg_seq_expected
      */
-    for(frag = 
(mca_pml_ob1_recv_frag_t*)opal_list_get_first(&proc->frags_cant_match);
-        frag != 
(mca_pml_ob1_recv_frag_t*)opal_list_get_end(&proc->frags_cant_match);
-        frag = (mca_pml_ob1_recv_frag_t*)opal_list_get_next(frag))
-    {
+    OPAL_LIST_FOREACH(frag, &proc->frags_cant_match, mca_pml_ob1_recv_frag_t) {
         mca_pml_ob1_match_hdr_t* hdr = &frag->hdr.hdr_match;
         /*
          * If the message has the next expected seq from that proc...
diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.c 
b/ompi/mca/pml/ob1/pml_ob1_recvreq.c
index 7c8853f..340eec2 100644
--- a/ompi/mca/pml/ob1/pml_ob1_recvreq.c
+++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.c
@@ -13,7 +13,7 @@
  * Copyright (c) 2008      UT-Battelle, LLC. All rights reserved.
  * Copyright (c) 2011      Sandia National Laboratories. All rights reserved.
  * Copyright (c) 2012-2013 NVIDIA Corporation.  All rights reserved.
- * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
+ * Copyright (c) 2011-2014 Los Alamos National Security, LLC. All rights
  *                         reserved.
  * Copyright (c) 2012      FUJITSU LIMITED.  All rights reserved.
  * $COPYRIGHT$
@@ -1066,7 +1066,6 @@ recv_req_match_specific_proc( const 
mca_pml_ob1_recv_request_t *req,
                               mca_pml_ob1_comm_proc_t *proc )
 {
     opal_list_t* unexpected_frags = &proc->unexpected_frags;
-    opal_list_item_t *i;
     mca_pml_ob1_recv_frag_t* frag;
     int tag = req->req_recv.req_base.req_tag;
 
@@ -1074,20 +1073,12 @@ recv_req_match_specific_proc( const 
mca_pml_ob1_recv_request_t *req,
         return NULL;
 
     if( OMPI_ANY_TAG == tag ) {
-        for (i =  opal_list_get_first(unexpected_frags);
-             i != opal_list_get_end(unexpected_frags);
-             i =  opal_list_get_next(i)) {
-            frag = (mca_pml_ob1_recv_frag_t*)i;
-            
+        OPAL_LIST_FOREACH (frag, unexpected_frags, mca_pml_ob1_recv_frag_t) {
             if( frag->hdr.hdr_match.hdr_tag >= 0 )
                 return frag;
         }
     } else {
-        for (i =  opal_list_get_first(unexpected_frags);
-             i != opal_list_get_end(unexpected_frags);
-             i =  opal_list_get_next(i)) {
-            frag = (mca_pml_ob1_recv_frag_t*)i;
-            
+        OPAL_LIST_FOREACH (frag, unexpected_frags, mca_pml_ob1_recv_frag_t) {
             if( frag->hdr.hdr_match.hdr_tag == tag )
                 return frag;
         }
@@ -1104,37 +1095,32 @@ recv_req_match_wild( mca_pml_ob1_recv_request_t* req,
                      mca_pml_ob1_comm_proc_t **p)
 {
     mca_pml_ob1_comm_t* comm = req->req_recv.req_base.req_comm->c_pml_comm;
-    mca_pml_ob1_comm_proc_t* proc = comm->procs;
-    size_t i;
+    mca_pml_ob1_comm_proc_t* proc;
 
     /*
      * Loop over all the outstanding messages to find one that matches.
      * There is an outer loop over lists of messages from each
      * process, then an inner loop over the messages from the
      * process.
-     *
-     * In order to avoid starvation do this in a round-robin fashion.
      */
-    for (i = comm->last_probed + 1; i < comm->num_procs; i++) {
-        mca_pml_ob1_recv_frag_t* frag;
-
-        /* loop over messages from the current proc */
-        if((frag = recv_req_match_specific_proc(req, &proc[i]))) {
-            *p = &proc[i];
-            comm->last_probed = i;
-            req->req_recv.req_base.req_proc = proc[i].ompi_proc;
-            prepare_recv_req_converter(req);
-            return frag; /* match found */
-        }
-    }
-    for (i = 0; i <= comm->last_probed; i++) {
-        mca_pml_ob1_recv_frag_t* frag;
-
-        /* loop over messages from the current proc */
-        if((frag = recv_req_match_specific_proc(req, &proc[i]))) {
-            *p = &proc[i];
-            comm->last_probed = i;
-            req->req_recv.req_base.req_proc = proc[i].ompi_proc;
+    OPAL_LIST_FOREACH(proc, &comm->unexpected_procs, mca_pml_ob1_comm_proc_t) {
+        mca_pml_ob1_recv_frag_t *frag = recv_req_match_specific_proc (req, 
proc);
+        if (NULL != frag) {
+            *p = proc;
+
+            opal_list_remove_item (&comm->unexpected_procs, &proc->super);
+            /* if this is a probe request don't move the proc to the end */
+            if (OPAL_LIKELY(MCA_PML_REQUEST_PROBE != 
req->req_recv.req_base.req_type &&
+                            MCA_PML_REQUEST_IPROBE != 
req->req_recv.req_base.req_type)) {
+                /* to avoid starvation move this proc to the end of the list */
+                opal_list_append (&comm->unexpected_procs, &proc->super);
+            } else {
+                /* this is a probe request. the caller will most likely call 
MPI_Recv
+                 * to complete this request so move it to the front of the 
queue */
+                opal_list_prepend (&comm->unexpected_procs, &proc->super);
+            }
+
+            req->req_recv.req_base.req_proc = proc->ompi_proc;
             prepare_recv_req_converter(req);
             return frag; /* match found */
         }
@@ -1209,6 +1195,9 @@ void 
mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
         req->req_match_received = false;
         OPAL_THREAD_UNLOCK(&comm->matching_lock);
     } else {
+        opal_list_remove_item(&proc->unexpected_frags,
+                              (opal_list_item_t*)frag);
+
         if(OPAL_LIKELY(!IS_PROB_REQ(req))) {
             PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_MATCH_UNEX,
                                     &(req->req_recv.req_base), PERUSE_RECV);
@@ -1223,8 +1212,10 @@ void 
mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
             PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_SEARCH_UNEX_Q_END,
                                     &(req->req_recv.req_base), PERUSE_RECV);
 
-            opal_list_remove_item(&proc->unexpected_frags,
-                                  (opal_list_item_t*)frag);
+            /* no more unexpected messages on this proc */
+            if (0 == opal_list_get_size (&proc->unexpected_frags)) {
+                opal_list_remove_item (&comm->unexpected_procs, &proc->super);
+            }
             OPAL_THREAD_UNLOCK(&comm->matching_lock);
             
             switch(hdr->hdr_common.hdr_type) {
@@ -1253,8 +1244,10 @@ void 
mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
                during the end of mprobe.  The request will then be
                "recreated" as a receive request, and the frag will be
                restarted with this request during mrecv */
-            opal_list_remove_item(&proc->unexpected_frags,
-                                  (opal_list_item_t*)frag);
+            /* no more unexpected messages on this proc */
+            if (0 == opal_list_get_size (&proc->unexpected_frags)) {
+                opal_list_remove_item (&comm->unexpected_procs, &proc->super);
+            }
             OPAL_THREAD_UNLOCK(&comm->matching_lock);
 
             req->req_recv.req_base.req_addr = frag;
@@ -1262,6 +1255,8 @@ void 
mca_pml_ob1_recv_req_start(mca_pml_ob1_recv_request_t *req)
                                                    frag->segments, 
frag->num_segments);
 
         } else {
+            opal_list_prepend(&proc->unexpected_frags,
+                              (opal_list_item_t*)frag);
             OPAL_THREAD_UNLOCK(&comm->matching_lock);
             mca_pml_ob1_recv_request_matched_probe(req, frag->btl,
                                                    frag->segments, 
frag->num_segments);
diff --git a/ompi/mpi/c/intercomm_merge.c b/ompi/mpi/c/intercomm_merge.c
index 0032646..970bb90 100644
--- a/ompi/mpi/c/intercomm_merge.c
+++ b/ompi/mpi/c/intercomm_merge.c
@@ -104,6 +104,10 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high,
                          new_group_pointer,        /* local group */
                          NULL                      /* remote group */
                          );
+    ompi_group_decrement_proc_count(new_group_pointer);
+    OBJ_RELEASE(new_group_pointer);
+    new_group_pointer = MPI_GROUP_NULL;
+
     if ( NULL == newcomp ) {
         rc = MPI_ERR_INTERN;
         goto exit;
@@ -112,10 +116,6 @@ int MPI_Intercomm_merge(MPI_Comm intercomm, int high,
         goto exit;
     }
 
-    ompi_group_decrement_proc_count(new_group_pointer);
-    OBJ_RELEASE(new_group_pointer);
-    new_group_pointer = MPI_GROUP_NULL;
-
     /* Determine context id. It is identical to f_2_c_handle */
     rc = ompi_comm_nextcid ( newcomp,              /* new comm */ 
                              intercomm,            /* old comm */

Attachment: pgpDS6EcM95BT.pgp
Description: PGP signature

Reply via email to