Hi all,
whatever this fixed/changed, I no longer get corrupted memory in the tuned data segment hung off each communicator... ! I'm still testing to see if I get TimPs error.
G

On Sat, 14 Jan 2006 bosi...@osl.iu.edu wrote:

Author: bosilca
Date: 2006-01-14 15:21:44 -0500 (Sat, 14 Jan 2006)
New Revision: 8692

Modified:
  trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c
  trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h
  trunk/ompi/mca/btl/tcp/btl_tcp_frag.c
  trunk/ompi/mca/btl/tcp/btl_tcp_frag.h
Log:
A better implementation for the TCP endpoint cache + few comments.


Modified: trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c
===================================================================
--- trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c   2006-01-14 20:19:01 UTC (rev 
8691)
+++ trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c   2006-01-14 20:21:44 UTC (rev 
8692)
@@ -79,7 +79,7 @@
    endpoint->endpoint_nbo = false;
#if MCA_BTL_TCP_ENDPOINT_CACHE
    endpoint->endpoint_cache        = NULL;
-    endpoint->endpoint_cache_pos    = 0;
+    endpoint->endpoint_cache_pos    = NULL;
    endpoint->endpoint_cache_length = 0;
#endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
    OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
@@ -187,21 +187,20 @@
static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t* 
btl_endpoint, int sd)
{
#if MCA_BTL_TCP_ENDPOINT_CACHE
-    btl_endpoint->endpoint_cache = 
(char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
+    btl_endpoint->endpoint_cache     = 
(char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
+    btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */

-    opal_event_set(
-        &btl_endpoint->endpoint_recv_event,
-        btl_endpoint->endpoint_sd,
-        OPAL_EV_READ|OPAL_EV_PERSIST,
-        mca_btl_tcp_endpoint_recv_handler,
-        btl_endpoint);
-    opal_event_set(
-        &btl_endpoint->endpoint_send_event,
-        btl_endpoint->endpoint_sd,
-        OPAL_EV_WRITE|OPAL_EV_PERSIST,
-        mca_btl_tcp_endpoint_send_handler,
-        btl_endpoint);
+    opal_event_set( &btl_endpoint->endpoint_recv_event,
+                   btl_endpoint->endpoint_sd,
+                   OPAL_EV_READ|OPAL_EV_PERSIST,
+                   mca_btl_tcp_endpoint_recv_handler,
+                   btl_endpoint );
+    opal_event_set( &btl_endpoint->endpoint_send_event,
+                   btl_endpoint->endpoint_sd,
+                   OPAL_EV_WRITE|OPAL_EV_PERSIST,
+                   mca_btl_tcp_endpoint_send_handler,
+                   btl_endpoint);
}


@@ -357,7 +356,9 @@
        btl_endpoint->endpoint_sd = -1;
#if MCA_BTL_TCP_ENDPOINT_CACHE
        free( btl_endpoint->endpoint_cache );
-        btl_endpoint->endpoint_cache = NULL;
+        btl_endpoint->endpoint_cache        = NULL;
+        btl_endpoint->endpoint_cache_pos    = NULL;
+        btl_endpoint->endpoint_cache_length = 0;
#endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
    }
    btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
@@ -619,13 +620,12 @@
            }

#if MCA_BTL_TCP_ENDPOINT_CACHE
-            btl_endpoint->endpoint_cache_pos = 0;
+            assert( 0 == btl_endpoint->endpoint_cache_length );
        data_still_pending_on_endpoint:
#endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
            /* check for completion of non-blocking recv on the current 
fragment */
            if(mca_btl_tcp_frag_recv(frag, sd) == false) {
                btl_endpoint->endpoint_recv_frag = frag;
-                OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
            } else {
                btl_endpoint->endpoint_recv_frag = NULL;
                switch(frag->hdr.type) {
@@ -636,39 +636,37 @@
                        break;
                    }
                default:
-                    {
-                        break;
-                    }
+                    break;
                }
#if MCA_BTL_TCP_ENDPOINT_CACHE
                if( 0 != btl_endpoint->endpoint_cache_length ) {
+                   /* If the cache still contain some data we can reuse the 
same fragment
+                    * until we flush it completly.
+                    */
                    MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
                    goto data_still_pending_on_endpoint;
                }
#endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
                MCA_BTL_TCP_FRAG_RETURN_MAX(frag);
-                OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
            }
-            break;
-        }
-    case MCA_BTL_TCP_SHUTDOWN:
-        {
            OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
+            assert( 0 == btl_endpoint->endpoint_cache_length );
            break;
        }
+    case MCA_BTL_TCP_SHUTDOWN:
+        OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
+        break;
    default:
-        {
-            OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
-            BTL_ERROR(("invalid socket state(%d)", 
btl_endpoint->endpoint_state));
-            mca_btl_tcp_endpoint_close(btl_endpoint);
-            break;
-        }
+        OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
+        BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
+        mca_btl_tcp_endpoint_close(btl_endpoint);
+        break;
    }
}


/*
- * A file descriptor is available/ready for send. Check the state
+ * A file descriptor is available/ready for send. Check the state
 * of the socket and take the appropriate action.
 */

@@ -680,7 +678,7 @@
    case MCA_BTL_TCP_CONNECTING:
        mca_btl_tcp_endpoint_complete_connect(btl_endpoint);
        break;
-    case MCA_BTL_TCP_CONNECTED:
+    case MCA_BTL_TCP_CONNECTED:
        {
        /* complete the current send */
        do {

Modified: trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h
===================================================================
--- trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h   2006-01-14 20:19:01 UTC (rev 
8691)
+++ trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h   2006-01-14 20:21:44 UTC (rev 
8692)
@@ -60,9 +60,9 @@
    struct mca_btl_tcp_addr_t*      endpoint_addr;         /**< address of 
endpoint */
    int                             endpoint_sd;           /**< socket 
connection to endpoint */
#if MCA_BTL_TCP_ENDPOINT_CACHE
-    char*                           endpoint_cache;        /**< cache for the 
recv (reduce the number of recv syscall */
-    size_t                          endpoint_cache_pos;    /**< */
-    size_t                          endpoint_cache_length; /**< */
+    char*                           endpoint_cache;        /**< cache for the 
recv (reduce the number of recv syscall) */
+    char*                           endpoint_cache_pos;    /**< current 
position in the cache */
+    size_t                          endpoint_cache_length; /**< length of the 
data in the cache */
#endif  /* MCA_BTL_TCP_ENDPOINT_CACHE */
    struct mca_btl_tcp_frag_t*      endpoint_send_frag;    /**< current send 
frag being processed */
    struct mca_btl_tcp_frag_t*      endpoint_recv_frag;    /**< current recv 
frag being processed */

Modified: trunk/ompi/mca/btl/tcp/btl_tcp_frag.c
===================================================================
--- trunk/ompi/mca/btl/tcp/btl_tcp_frag.c       2006-01-14 20:19:01 UTC (rev 
8691)
+++ trunk/ompi/mca/btl/tcp/btl_tcp_frag.c       2006-01-14 20:21:44 UTC (rev 
8692)
@@ -119,29 +119,36 @@
bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
{
    int cnt;
-    size_t i, num_vecs = frag->iov_cnt;
+    size_t i, num_vecs;
    mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;

 repeat:
-
+    num_vecs = frag->iov_cnt;
#if MCA_BTL_TCP_ENDPOINT_CACHE
    if( 0 != btl_endpoint->endpoint_cache_length ) {
        size_t length = btl_endpoint->endpoint_cache_length;
+        /* It's strange at the first look but cnt have to be set to the full 
amount of data available.
+         * After going to advance_iov_position we will use cnt to detect if 
there is still some
+         * data pending.
+         */
        cnt = btl_endpoint->endpoint_cache_length;
        for( i = 0; i < frag->iov_cnt; i++ ) {
            if( length > frag->iov_ptr[i].iov_len )
                length = frag->iov_ptr[0].iov_len;
-            memcpy( frag->iov_ptr[i].iov_base,
-                    btl_endpoint->endpoint_cache + 
btl_endpoint->endpoint_cache_pos,
-                    length );
+            memcpy( frag->iov_ptr[i].iov_base, 
btl_endpoint->endpoint_cache_pos, length );
            btl_endpoint->endpoint_cache_pos += length;
            btl_endpoint->endpoint_cache_length -= length;
            length = btl_endpoint->endpoint_cache_length;
-            if( 0 == length ) break;
+            if( 0 == length ) {
+               btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
+               break;
+           }
        }
        goto advance_iov_position;
    }
-
+    /* What's happens if all iovecs are used by the fragment ? It still work, 
as we reserve one
+     * iovec for the caching in the fragment structure (the +1).
+     */
    frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache;
    frag->iov_ptr[num_vecs].iov_len  = mca_btl_tcp_component.tcp_endpoint_cache;
    num_vecs++;
@@ -162,15 +169,13 @@
                             frag->iov_ptr[0].iov_base, 
frag->iov_ptr[0].iov_len,
                             strerror(ompi_socket_errno), frag->iov_cnt );
            default:
-                {
-                    opal_output(0, "mca_btl_tcp_frag_send: writev failed with 
errno=%d",
-                                ompi_socket_errno);
-                    mca_btl_tcp_endpoint_close(btl_endpoint);
-                    return false;
-                }
+                opal_output(0, "mca_btl_tcp_frag_send: writev failed with 
errno=%d",
+                            ompi_socket_errno);
+                mca_btl_tcp_endpoint_close(btl_endpoint);
+                return false;
            }
        }
-        if(cnt == 0) {
+        if( cnt == 0 ) {
            mca_btl_tcp_endpoint_close(btl_endpoint);
            return false;
        }
@@ -180,8 +185,8 @@
 advance_iov_position:
    /* if the write didn't complete - update the iovec state */
    num_vecs = frag->iov_cnt;
-    for(i=0; i<num_vecs; i++) {
-        if(cnt >= (int)frag->iov_ptr->iov_len) {
+    for( i = 0; i < num_vecs; i++ ) {
+        if( cnt >= (int)frag->iov_ptr->iov_len ) {
            cnt -= frag->iov_ptr->iov_len;
            frag->iov_idx++;
            frag->iov_ptr++;
@@ -190,7 +195,7 @@
            frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
                (((unsigned char*)frag->iov_ptr->iov_base) + cnt);
            frag->iov_ptr->iov_len -= cnt;
-           cnt = 0;
+            cnt = 0;
            break;
        }
    }

Modified: trunk/ompi/mca/btl/tcp/btl_tcp_frag.h
===================================================================
--- trunk/ompi/mca/btl/tcp/btl_tcp_frag.h       2006-01-14 20:19:01 UTC (rev 
8691)
+++ trunk/ompi/mca/btl/tcp/btl_tcp_frag.h       2006-01-14 20:21:44 UTC (rev 
8692)
@@ -49,7 +49,7 @@
    struct mca_btl_base_endpoint_t *endpoint;
    struct mca_btl_tcp_module_t* btl;
    mca_btl_tcp_hdr_t hdr;
-    struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER];
+    struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER + 1];
    struct iovec *iov_ptr;
    size_t iov_cnt;
    size_t iov_idx;

_______________________________________________
svn mailing list
s...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/svn



Thanks,
        Graham.
----------------------------------------------------------------------
Dr Graham E. Fagg       | Distributed, Parallel and Meta-Computing
Innovative Computing Lab. PVM3.4, HARNESS, FT-MPI, SNIPE & Open MPI
Computer Science Dept   | Suite 203, 1122 Volunteer Blvd,
University of Tennessee | Knoxville, Tennessee, USA. TN 37996-3450
Email: f...@cs.utk.edu  | Phone:+1(865)974-5790 | Fax:+1(865)974-8296
Broken complex systems are always derived from working simple systems
----------------------------------------------------------------------

Reply via email to