Hi all,
whatever this fixed/changed, I no longer get corrupted memory in the
tuned data segment hung off each communicator... ! I'm still testing to
see if I get TimPs error.
G
On Sat, 14 Jan 2006 bosi...@osl.iu.edu wrote:
Author: bosilca
Date: 2006-01-14 15:21:44 -0500 (Sat, 14 Jan 2006)
New Revision: 8692
Modified:
trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c
trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h
trunk/ompi/mca/btl/tcp/btl_tcp_frag.c
trunk/ompi/mca/btl/tcp/btl_tcp_frag.h
Log:
A better implementation for the TCP endpoint cache + few comments.
Modified: trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c
===================================================================
--- trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c 2006-01-14 20:19:01 UTC (rev
8691)
+++ trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.c 2006-01-14 20:21:44 UTC (rev
8692)
@@ -79,7 +79,7 @@
endpoint->endpoint_nbo = false;
#if MCA_BTL_TCP_ENDPOINT_CACHE
endpoint->endpoint_cache = NULL;
- endpoint->endpoint_cache_pos = 0;
+ endpoint->endpoint_cache_pos = NULL;
endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
OBJ_CONSTRUCT(&endpoint->endpoint_frags, opal_list_t);
@@ -187,21 +187,20 @@
static inline void mca_btl_tcp_endpoint_event_init(mca_btl_base_endpoint_t*
btl_endpoint, int sd)
{
#if MCA_BTL_TCP_ENDPOINT_CACHE
- btl_endpoint->endpoint_cache =
(char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
+ btl_endpoint->endpoint_cache =
(char*)malloc(mca_btl_tcp_component.tcp_endpoint_cache);
+ btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
- opal_event_set(
- &btl_endpoint->endpoint_recv_event,
- btl_endpoint->endpoint_sd,
- OPAL_EV_READ|OPAL_EV_PERSIST,
- mca_btl_tcp_endpoint_recv_handler,
- btl_endpoint);
- opal_event_set(
- &btl_endpoint->endpoint_send_event,
- btl_endpoint->endpoint_sd,
- OPAL_EV_WRITE|OPAL_EV_PERSIST,
- mca_btl_tcp_endpoint_send_handler,
- btl_endpoint);
+ opal_event_set( &btl_endpoint->endpoint_recv_event,
+ btl_endpoint->endpoint_sd,
+ OPAL_EV_READ|OPAL_EV_PERSIST,
+ mca_btl_tcp_endpoint_recv_handler,
+ btl_endpoint );
+ opal_event_set( &btl_endpoint->endpoint_send_event,
+ btl_endpoint->endpoint_sd,
+ OPAL_EV_WRITE|OPAL_EV_PERSIST,
+ mca_btl_tcp_endpoint_send_handler,
+ btl_endpoint);
}
@@ -357,7 +356,9 @@
btl_endpoint->endpoint_sd = -1;
#if MCA_BTL_TCP_ENDPOINT_CACHE
free( btl_endpoint->endpoint_cache );
- btl_endpoint->endpoint_cache = NULL;
+ btl_endpoint->endpoint_cache = NULL;
+ btl_endpoint->endpoint_cache_pos = NULL;
+ btl_endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
}
btl_endpoint->endpoint_state = MCA_BTL_TCP_CLOSED;
@@ -619,13 +620,12 @@
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
- btl_endpoint->endpoint_cache_pos = 0;
+ assert( 0 == btl_endpoint->endpoint_cache_length );
data_still_pending_on_endpoint:
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
/* check for completion of non-blocking recv on the current
fragment */
if(mca_btl_tcp_frag_recv(frag, sd) == false) {
btl_endpoint->endpoint_recv_frag = frag;
- OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
} else {
btl_endpoint->endpoint_recv_frag = NULL;
switch(frag->hdr.type) {
@@ -636,39 +636,37 @@
break;
}
default:
- {
- break;
- }
+ break;
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
+ /* If the cache still contain some data we can reuse the
same fragment
+ * until we flush it completly.
+ */
MCA_BTL_TCP_FRAG_INIT_DST(frag, btl_endpoint);
goto data_still_pending_on_endpoint;
}
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
MCA_BTL_TCP_FRAG_RETURN_MAX(frag);
- OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
}
- break;
- }
- case MCA_BTL_TCP_SHUTDOWN:
- {
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
+ assert( 0 == btl_endpoint->endpoint_cache_length );
break;
}
+ case MCA_BTL_TCP_SHUTDOWN:
+ OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
+ break;
default:
- {
- OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
- BTL_ERROR(("invalid socket state(%d)",
btl_endpoint->endpoint_state));
- mca_btl_tcp_endpoint_close(btl_endpoint);
- break;
- }
+ OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_recv_lock);
+ BTL_ERROR(("invalid socket state(%d)", btl_endpoint->endpoint_state));
+ mca_btl_tcp_endpoint_close(btl_endpoint);
+ break;
}
}
/*
- * A file descriptor is available/ready for send. Check the state
+ * A file descriptor is available/ready for send. Check the state
* of the socket and take the appropriate action.
*/
@@ -680,7 +678,7 @@
case MCA_BTL_TCP_CONNECTING:
mca_btl_tcp_endpoint_complete_connect(btl_endpoint);
break;
- case MCA_BTL_TCP_CONNECTED:
+ case MCA_BTL_TCP_CONNECTED:
{
/* complete the current send */
do {
Modified: trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h
===================================================================
--- trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h 2006-01-14 20:19:01 UTC (rev
8691)
+++ trunk/ompi/mca/btl/tcp/btl_tcp_endpoint.h 2006-01-14 20:21:44 UTC (rev
8692)
@@ -60,9 +60,9 @@
struct mca_btl_tcp_addr_t* endpoint_addr; /**< address of
endpoint */
int endpoint_sd; /**< socket
connection to endpoint */
#if MCA_BTL_TCP_ENDPOINT_CACHE
- char* endpoint_cache; /**< cache for the
recv (reduce the number of recv syscall */
- size_t endpoint_cache_pos; /**< */
- size_t endpoint_cache_length; /**< */
+ char* endpoint_cache; /**< cache for the
recv (reduce the number of recv syscall) */
+ char* endpoint_cache_pos; /**< current
position in the cache */
+ size_t endpoint_cache_length; /**< length of the
data in the cache */
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
struct mca_btl_tcp_frag_t* endpoint_send_frag; /**< current send
frag being processed */
struct mca_btl_tcp_frag_t* endpoint_recv_frag; /**< current recv
frag being processed */
Modified: trunk/ompi/mca/btl/tcp/btl_tcp_frag.c
===================================================================
--- trunk/ompi/mca/btl/tcp/btl_tcp_frag.c 2006-01-14 20:19:01 UTC (rev
8691)
+++ trunk/ompi/mca/btl/tcp/btl_tcp_frag.c 2006-01-14 20:21:44 UTC (rev
8692)
@@ -119,29 +119,36 @@
bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t* frag, int sd)
{
int cnt;
- size_t i, num_vecs = frag->iov_cnt;
+ size_t i, num_vecs;
mca_btl_base_endpoint_t* btl_endpoint = frag->endpoint;
repeat:
-
+ num_vecs = frag->iov_cnt;
#if MCA_BTL_TCP_ENDPOINT_CACHE
if( 0 != btl_endpoint->endpoint_cache_length ) {
size_t length = btl_endpoint->endpoint_cache_length;
+ /* It's strange at the first look but cnt have to be set to the full
amount of data available.
+ * After going to advance_iov_position we will use cnt to detect if
there is still some
+ * data pending.
+ */
cnt = btl_endpoint->endpoint_cache_length;
for( i = 0; i < frag->iov_cnt; i++ ) {
if( length > frag->iov_ptr[i].iov_len )
length = frag->iov_ptr[0].iov_len;
- memcpy( frag->iov_ptr[i].iov_base,
- btl_endpoint->endpoint_cache +
btl_endpoint->endpoint_cache_pos,
- length );
+ memcpy( frag->iov_ptr[i].iov_base,
btl_endpoint->endpoint_cache_pos, length );
btl_endpoint->endpoint_cache_pos += length;
btl_endpoint->endpoint_cache_length -= length;
length = btl_endpoint->endpoint_cache_length;
- if( 0 == length ) break;
+ if( 0 == length ) {
+ btl_endpoint->endpoint_cache_pos = btl_endpoint->endpoint_cache;
+ break;
+ }
}
goto advance_iov_position;
}
-
+ /* What's happens if all iovecs are used by the fragment ? It still work,
as we reserve one
+ * iovec for the caching in the fragment structure (the +1).
+ */
frag->iov_ptr[num_vecs].iov_base = btl_endpoint->endpoint_cache;
frag->iov_ptr[num_vecs].iov_len = mca_btl_tcp_component.tcp_endpoint_cache;
num_vecs++;
@@ -162,15 +169,13 @@
frag->iov_ptr[0].iov_base,
frag->iov_ptr[0].iov_len,
strerror(ompi_socket_errno), frag->iov_cnt );
default:
- {
- opal_output(0, "mca_btl_tcp_frag_send: writev failed with
errno=%d",
- ompi_socket_errno);
- mca_btl_tcp_endpoint_close(btl_endpoint);
- return false;
- }
+ opal_output(0, "mca_btl_tcp_frag_send: writev failed with
errno=%d",
+ ompi_socket_errno);
+ mca_btl_tcp_endpoint_close(btl_endpoint);
+ return false;
}
}
- if(cnt == 0) {
+ if( cnt == 0 ) {
mca_btl_tcp_endpoint_close(btl_endpoint);
return false;
}
@@ -180,8 +185,8 @@
advance_iov_position:
/* if the write didn't complete - update the iovec state */
num_vecs = frag->iov_cnt;
- for(i=0; i<num_vecs; i++) {
- if(cnt >= (int)frag->iov_ptr->iov_len) {
+ for( i = 0; i < num_vecs; i++ ) {
+ if( cnt >= (int)frag->iov_ptr->iov_len ) {
cnt -= frag->iov_ptr->iov_len;
frag->iov_idx++;
frag->iov_ptr++;
@@ -190,7 +195,7 @@
frag->iov_ptr->iov_base = (ompi_iov_base_ptr_t)
(((unsigned char*)frag->iov_ptr->iov_base) + cnt);
frag->iov_ptr->iov_len -= cnt;
- cnt = 0;
+ cnt = 0;
break;
}
}
Modified: trunk/ompi/mca/btl/tcp/btl_tcp_frag.h
===================================================================
--- trunk/ompi/mca/btl/tcp/btl_tcp_frag.h 2006-01-14 20:19:01 UTC (rev
8691)
+++ trunk/ompi/mca/btl/tcp/btl_tcp_frag.h 2006-01-14 20:21:44 UTC (rev
8692)
@@ -49,7 +49,7 @@
struct mca_btl_base_endpoint_t *endpoint;
struct mca_btl_tcp_module_t* btl;
mca_btl_tcp_hdr_t hdr;
- struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER];
+ struct iovec iov[MCA_BTL_TCP_FRAG_IOVEC_NUMBER + 1];
struct iovec *iov_ptr;
size_t iov_cnt;
size_t iov_idx;
_______________________________________________
svn mailing list
s...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/svn
Thanks,
Graham.
----------------------------------------------------------------------
Dr Graham E. Fagg | Distributed, Parallel and Meta-Computing
Innovative Computing Lab. PVM3.4, HARNESS, FT-MPI, SNIPE & Open MPI
Computer Science Dept | Suite 203, 1122 Volunteer Blvd,
University of Tennessee | Knoxville, Tennessee, USA. TN 37996-3450
Email: f...@cs.utk.edu | Phone:+1(865)974-5790 | Fax:+1(865)974-8296
Broken complex systems are always derived from working simple systems
----------------------------------------------------------------------