Hello,

I took a look at the ADIO Lustre driver last week in an attempt to make collective I/O work for my application. I believe that I've gotten the driver to do the right thing now. With the following patch all of the tests under romio/test pass in my environment. My application, which requires reasonably complex file views, also appears to write correct files now on a Lustre filesystem using collective I/O.

Problems in the noncontig and i_noncontig tests are fixed by changes to ad_lustre_wrstr.c. Basically, I made changes to that file so that it now more closely matches adio/common/ad_write_str.c. The only differences in these files should now be in the Lustre-specific parts. Problems in the large_array test are fixed in ad_lustre_aggregate.c and ad_lustre_wrcoll.c. The major problem was in the handling of the "buf_idx" arrays, which were not correctly tracking the data out of a process's data buffer being sent to multiple other processes. My tests involved four MPI nodes, four Lustre OSTs and both four and eight MPI processes.

I also patched ad_lustre_aggregate in its calculation of "avail_cb_nodes" as previously provided by Pascal Deveze. I don't know whether that has had any effect on my changes, since I did no testing without Pascal's changes.

There are some other changes, such as formatting and the addition of some assertions. They are included in the attached patch, but I could take them out if desired.

The patch should apply cleanly to sources out of a fresh mpich2-1.2.1 tarball. I'm not an old hand at MPI, so please review my patch carefully.

--
Martin

diff -ubr mpich2-1.2.1/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c mpich2-1.2.1-new/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c
--- mpich2-1.2.1/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c	2009-07-02 09:19:27.000000000 -0600
+++ mpich2-1.2.1-new/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c	2010-01-02 20:23:58.000000000 -0700
@@ -24,7 +24,6 @@
      */
     int stripe_size, stripe_count, CO = 1, CO_max = 1, CO_nodes, lflag;
     int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
-    char *value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char));
 
     /* Get hints value */
     /* stripe size */
@@ -59,20 +58,8 @@
      * so that each OST is accessed by only one or more constant clients. */
     CO_nodes = stripe_count * CO;
     avail_cb_nodes = ADIOI_MIN(nprocs_for_coll, CO_nodes);
-    if (avail_cb_nodes ==  CO_nodes) {
-        do {
-            /* find the divisor of CO_nodes */
-            divisor = 1;
-            do {
-                divisor ++;
-            } while (CO_nodes % divisor);
-            CO_nodes = CO_nodes / divisor;
-            /* if stripe_count*CO is a prime number, change nothing */
-            if ((CO_nodes <= avail_cb_nodes) && (CO_nodes != 1)) {
-                avail_cb_nodes = CO_nodes;
-                break;
-            }
-        } while (CO_nodes != 1);
+    while (CO_nodes % avail_cb_nodes != 0) {
+        avail_cb_nodes--;
     }
 
     *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
@@ -80,8 +67,6 @@
     striping_info[0] = stripe_size;
     striping_info[1] = stripe_count;
     striping_info[2] = avail_cb_nodes;
-
-    ADIOI_Free(value);
 }
 
 int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
@@ -125,11 +110,11 @@
                               int *count_my_req_procs_ptr,
 			      int **count_my_req_per_proc_ptr,
 			      ADIOI_Access **my_req_ptr,
-			      int **buf_idx_ptr)
+                              int ***buf_idx_ptr)
 {
     /* Nothing different from ADIOI_Calc_my_req(), except calling
      * ADIOI_Lustre_Calc_aggregator() instead of the old one */
-    int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
+    int *count_my_req_per_proc, count_my_req_procs, **buf_idx;
     int i, l, proc;
     ADIO_Offset avail_len, rem_len, curr_idx, off;
     ADIOI_Access *my_req;
@@ -142,17 +127,6 @@
      * MPI_Alltoall later on.
      */
 
-    buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
-    /* buf_idx is relevant only if buftype_is_contig.
-     * buf_idx[i] gives the index into user_buf where data received
-     * from proc. i should be placed. This allows receives to be done
-     * without extra buffer. This can't be done if buftype is not contig.
-     */
-
-    /* initialize buf_idx to -1 */
-    for (i = 0; i < nprocs; i++)
-	buf_idx[i] = -1;
-
     /* one pass just to calculate how much space to allocate for my_req;
      * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
      */
@@ -186,6 +160,21 @@
 	}
     }
 
+    buf_idx = (int **) ADIOI_Malloc(nprocs * sizeof(int *));
+    /* buf_idx is relevant only if buftype_is_contig.
+     * buf_idx[i] gives the index into user_buf where data received
+     * from proc. i should be placed. This allows receives to be done
+     * without extra buffer. This can't be done if buftype is not contig.
+     */
+
+    /* initialize buf_idx vectors */
+    for (i = 0; i < nprocs; i++) {
+        /* add one to count_my_req_per_proc[i] to avoid zero size
+         * malloc */
+        buf_idx[i] = (int *) ADIOI_Malloc((count_my_req_per_proc[i] + 1)
+                                          * sizeof(int));
+    }
+
     /* now allocate space for my_req, offset, and len */
     *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
     my_req = *my_req_ptr;
@@ -212,15 +201,13 @@
 	    continue;
 	off = offset_list[i];
 	avail_len = len_list[i];
-	proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
-
-	/* for each separate contiguous access from this process */
-	if (buf_idx[proc] == -1)
-	    buf_idx[proc] = (int) curr_idx;
-
+        proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
+                                            striping_info);
 	l = my_req[proc].count;
-	curr_idx += (int) avail_len;	/* NOTE: Why is curr_idx an int?  Fix? */
-
+        ADIOI_Assert(curr_idx == (int) curr_idx);
+        ADIOI_Assert(l < count_my_req_per_proc[proc]);
+        buf_idx[proc][l] = (int) curr_idx;
+        curr_idx += avail_len;
 	rem_len = len_list[i] - avail_len;
 
 	/* store the proc, offset, and len information in an array
@@ -229,6 +216,7 @@
 	 * and the associated count.
 	 */
 	my_req[proc].offsets[l] = off;
+        ADIOI_Assert(avail_len == (int) avail_len);
 	my_req[proc].lens[l] = (int) avail_len;
 	my_req[proc].count++;
 
@@ -237,14 +225,16 @@
 	    avail_len = rem_len;
 	    proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
                                                 striping_info);
-	    if (buf_idx[proc] == -1)
-		buf_idx[proc] = (int) curr_idx;
-
 	    l = my_req[proc].count;
+            ADIOI_Assert(curr_idx == (int) curr_idx);
+            ADIOI_Assert(l < count_my_req_per_proc[proc]);
+            buf_idx[proc][l] = (int) curr_idx;
+
 	    curr_idx += avail_len;
 	    rem_len -= avail_len;
 
 	    my_req[proc].offsets[l] = off;
+            ADIOI_Assert(avail_len == (int) avail_len);
 	    my_req[proc].lens[l] = (int) avail_len;
 	    my_req[proc].count++;
 	}
diff -ubr mpich2-1.2.1/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c mpich2-1.2.1-new/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c
--- mpich2-1.2.1/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c	2009-07-02 09:19:27.000000000 -0600
+++ mpich2-1.2.1-new/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c	2010-01-04 08:44:56.000000000 -0700
@@ -21,7 +21,7 @@
 					ADIO_Offset *len_list,
 					int contig_access_count,
 					int *striping_info,
-					int *buf_idx, int *error_code);
+                                        int **buf_idx, int *error_code);
 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
 					  ADIOI_Flatlist_node *flat_buf,
 					  char **send_buf,
@@ -83,7 +83,7 @@
     ADIO_Offset orig_fp, start_offset, end_offset, off;
     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
     ADIO_Offset *len_list = NULL;
-    int *buf_idx = NULL, *striping_info = NULL;
+    int **buf_idx = NULL, *striping_info = NULL;
     int old_error, tmp_error;
 
     MPI_Comm_size(fd->comm, &nprocs);
@@ -153,7 +153,7 @@
 	ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
 	if (buftype_is_contig && filetype_is_contig) {
 	    if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
-		off = fd->disp + (fd->etype_size) * offset;
+                off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
 		ADIO_WriteContig(fd, buf, count, datatype,
 				 ADIO_EXPLICIT_OFFSET,
 				 off, status, error_code);
@@ -175,7 +175,8 @@
      */
     ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
                              striping_info, nprocs, &count_my_req_procs,
-                             &count_my_req_per_proc, &my_req, &buf_idx);
+                             &count_my_req_per_proc, &my_req,
+                             &buf_idx);
 
     /* based on everyone's my_req, calculate what requests of other processes
      * will be accessed by this process.
@@ -192,9 +193,9 @@
 
     /* exchange data and write in sizes of no more than stripe_size. */
     ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
-                                others_req, my_req,
-                                offset_list, len_list, contig_access_count,
-				striping_info, buf_idx, error_code);
+                                others_req, my_req, offset_list, len_list,
+                                contig_access_count, striping_info,
+                                buf_idx, error_code);
 
     /* If this collective write is followed by an independent write,
      * it's possible to have those subsequent writes on other processes
@@ -253,6 +254,9 @@
 	}
     }
     ADIOI_Free(my_req);
+    for (i = 0; i < nprocs; i++) {
+        ADIOI_Free(buf_idx[i]);
+    }
     ADIOI_Free(buf_idx);
     ADIOI_Free(offset_list);
     ADIOI_Free(len_list);
@@ -286,7 +290,7 @@
 					ADIO_Offset *offset_list,
                                         ADIO_Offset *len_list, 
 					int contig_access_count,
-					int *striping_info, int *buf_idx,
+                                        int *striping_info, int **buf_idx,
                                         int *error_code)
 {
     /* Send data to appropriate processes and write in sizes of no more
@@ -308,6 +312,7 @@
     int *send_curr_offlen_ptr, *send_size;
     int *partial_recv, *sent_to_proc, *recv_start_pos;
     int *send_buf_idx, *curr_to_proc, *done_to_proc;
+    int *this_buf_idx;
     char *write_buf = NULL;
     MPI_Status status;
     ADIOI_Flatlist_node *flat_buf = NULL;
@@ -355,7 +360,9 @@
      * and ntimes=whole_file_portion/step_size
      */
     step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
-    max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1);
+    max_ntimes = (max_end_loc - min_st_loc + 1) / step_size
+        + (((max_end_loc - min_st_loc + 1) % step_size) ? 1 : 0);
+/*     max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1); */
     if (ntimes)
 	write_buf = (char *) ADIOI_Malloc(stripe_size);
 
@@ -395,6 +402,8 @@
     done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
     /* Above three are used in ADIOI_Fill_send_buffer */
 
+    this_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+
     recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
     /* used to store the starting value of recv_curr_offlen_ptr[i] in
        this iteration */
@@ -461,11 +470,13 @@
 
         off = off_list[m];
         max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
-        real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size - off,
+        real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size -
+                                    off,
                                     end_loc - off + 1);
 
 	for (i = 0; i < nprocs; i++) {
             if (my_req[i].count) {
+                this_buf_idx[i] = buf_idx[i][send_curr_offlen_ptr[i]];
                 for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
                     send_off = my_req[i].offsets[j];
                     send_len = my_req[i].lens[j];
@@ -484,6 +495,7 @@
                     req_len = others_req[i].lens[j];
 		    if (req_off < iter_st_off + max_size) {
 			recv_count[i]++;
+                        ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIR_Upint)(write_buf+req_off-off));
 			MPI_Address(write_buf + req_off - off,
 				    &(others_req[i].mem_ptrs[j]));
                         recv_size[i] += req_len;
@@ -503,7 +515,7 @@
                                      buftype_is_contig, contig_access_count,
                                      striping_info, others_req, send_buf_idx,
                                      curr_to_proc, done_to_proc, &hole, m,
-                                     buftype_extent, buf_idx, error_code);
+                                     buftype_extent, this_buf_idx, error_code);
 	if (*error_code != MPI_SUCCESS)
             goto over;
 
@@ -564,6 +576,7 @@
     ADIOI_Free(send_buf_idx);
     ADIOI_Free(curr_to_proc);
     ADIOI_Free(done_to_proc);
+    ADIOI_Free(this_buf_idx);
     ADIOI_Free(off_list);
 }
 
@@ -714,13 +727,14 @@
 	j = 0;
 	for (i = 0; i < nprocs; i++)
 	    if (send_size[i]) {
+                ADIOI_Assert(buf_idx[i] != -1);
 		MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
 			  MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
 			  send_req + j);
 		j++;
-		buf_idx[i] += send_size[i];
 	    }
-    } else if (nprocs_send) {
+    } else
+        if (nprocs_send) {
 	/* buftype is not contig */
 	send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
 	for (i = 0; i < nprocs; i++)
@@ -803,7 +817,7 @@
                 n_buftypes++; \
             } \
             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
-                              n_buftypes*buftype_extent; \
+                (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;  \
             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
         } \
         buf_incr -= size_in_buf; \
@@ -815,6 +829,8 @@
 { \
     while (size) { \
         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
+        ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
+        ADIOI_Assert(size_in_buf == (size_t)size_in_buf);               \
         memcpy(&(send_buf[p][send_buf_idx[p]]), \
                ((char *) buf) + user_buf_idx, size_in_buf); \
         send_buf_idx[p] += size_in_buf; \
@@ -827,7 +843,7 @@
                 n_buftypes++; \
             } \
             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
-                              n_buftypes*buftype_extent; \
+                (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;    \
             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
         } \
         size -= size_in_buf; \
@@ -900,14 +916,17 @@
 					       send_buf_idx[p]);
 			buf_incr = done_to_proc[p] - curr_to_proc[p];
 			ADIOI_BUF_INCR
+                        ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
 			    buf_incr = (int) (curr_to_proc[p] + len -
 					      done_to_proc[p]);
+                        ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
 			curr_to_proc[p] = done_to_proc[p] + size;
 		        ADIOI_BUF_COPY
                     } else {
 			size = (int) ADIOI_MIN(len, send_size[p] -
 					       send_buf_idx[p]);
 			buf_incr = (int) len;
+                        ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
 			curr_to_proc[p] += size;
 		        ADIOI_BUF_COPY
                     }
@@ -918,6 +937,7 @@
 			jj++;
 		    }
 		} else {
+                    ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
 		    curr_to_proc[p] += (int) len;
 		    buf_incr = (int) len;
 		    ADIOI_BUF_INCR
diff -ubr mpich2-1.2.1/src/mpi/romio/adio/ad_lustre/ad_lustre_wrstr.c mpich2-1.2.1-new/src/mpi/romio/adio/ad_lustre/ad_lustre_wrstr.c
--- mpich2-1.2.1/src/mpi/romio/adio/ad_lustre/ad_lustre_wrstr.c	2009-03-13 09:30:30.000000000 -0600
+++ mpich2-1.2.1-new/src/mpi/romio/adio/ad_lustre/ad_lustre_wrstr.c	2010-01-04 08:50:51.000000000 -0700
@@ -16,12 +16,14 @@
     if (req_off >= writebuf_off + writebuf_len) { \
         if (writebuf_len) { \
            ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
-                  ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
+                             ADIO_EXPLICIT_OFFSET, writebuf_off,        \
+                             &status1, error_code);                     \
            if (!(fd->atomicity)) \
                 ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
            if (*error_code != MPI_SUCCESS) { \
                *error_code = MPIO_Err_create_code(*error_code, \
-                                                  MPIR_ERR_RECOVERABLE, myname, \
+                                                   MPIR_ERR_RECOVERABLE, \
+                                                   myname,              \
                                                   __LINE__, MPI_ERR_IO, \
                                                   "**iowswc", 0); \
                ADIOI_Free(writebuf); \
@@ -30,25 +32,30 @@
         } \
 	writebuf_off = req_off; \
         /* stripe_size alignment */ \
-        writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
+        writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
                                        (writebuf_off / stripe_size + 1) * \
-                                       stripe_size - writebuf_off);\
+                                            stripe_size - writebuf_off); \
 	if (!(fd->atomicity)) \
             ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
-	ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\
+        ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE,           \
+                        ADIO_EXPLICIT_OFFSET,                           \
                         writebuf_off, &status1, error_code); \
 	if (*error_code != MPI_SUCCESS) { \
 	    *error_code = MPIO_Err_create_code(*error_code, \
-					       MPIR_ERR_RECOVERABLE, myname, \
+                                               MPIR_ERR_RECOVERABLE,    \
+                                               myname,                  \
 					       __LINE__, MPI_ERR_IO, \
 					       "**iowsrc", 0); \
             ADIOI_Free(writebuf); \
 	    return; \
 	} \
     } \
-    write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
-    memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\
-    while (write_sz != req_len) {\
+    write_sz = (unsigned) (ADIOI_MIN(req_len,                           \
+                                     writebuf_off + writebuf_len - req_off)); \
+    ADIOI_Assert((ADIO_Offset)write_sz ==                               \
+                 ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off)); \
+    memcpy(writebuf + req_off - writebuf_off, (char *)buf +userbuf_off, write_sz); \
+    while (write_sz != req_len) {                                       \
         ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
                          ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
         if (!(fd->atomicity)) \
@@ -65,12 +72,13 @@
         userbuf_off += write_sz; \
         writebuf_off += writebuf_len; \
         /* stripe_size alignment */ \
-        writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
+        writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
                                        (writebuf_off / stripe_size + 1) * \
-                                       stripe_size - writebuf_off);\
+                                            stripe_size - writebuf_off); \
 	if (!(fd->atomicity)) \
             ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
-        ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\
+        ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE,           \
+                        ADIO_EXPLICIT_OFFSET,                           \
                         writebuf_off, &status1, error_code); \
 	if (*error_code != MPI_SUCCESS) { \
 	    *error_code = MPIO_Err_create_code(*error_code, \
@@ -81,7 +89,7 @@
 	    return; \
 	} \
         write_sz = ADIOI_MIN(req_len, writebuf_len); \
-        memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\
+        memcpy(writebuf, (char *)buf + userbuf_off, write_sz);          \
     } \
 }
 
@@ -91,15 +99,29 @@
 #define ADIOI_BUFFERED_WRITE_WITHOUT_READ \
 { \
     if (req_off >= writebuf_off + writebuf_len) { \
+        ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,          \
+                         ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,  \
+                         error_code);                                   \
+        if (*error_code != MPI_SUCCESS) {                               \
+            *error_code = MPIO_Err_create_code(*error_code,             \
+                                               MPIR_ERR_RECOVERABLE,    \
+                                               myname,                  \
+                                               __LINE__, MPI_ERR_IO,    \
+                                               "**iowswc", 0);          \
+            ADIOI_Free(writebuf);                                       \
+            return;                                                     \
+        }                                                               \
 	writebuf_off = req_off; \
         /* stripe_size alignment */ \
-        writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
+        writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
                                        (writebuf_off / stripe_size + 1) * \
-                                       stripe_size - writebuf_off);\
+                                            stripe_size - writebuf_off); \
     } \
-    write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
-    memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\
-    while (req_len) { \
+    write_sz = (unsigned) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
+    ADIOI_Assert((ADIO_Offset)write_sz == ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off)); \
+    memcpy(writebuf + req_off - writebuf_off,                           \
+           (char *)buf + userbuf_off, write_sz);                        \
+    while (write_sz != req_len) {                                       \
         ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
                          ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
         if (*error_code != MPI_SUCCESS) { \
@@ -114,11 +136,11 @@
         userbuf_off += write_sz; \
         writebuf_off += writebuf_len; \
         /* stripe_size alignment */ \
-        writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
+        writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
                                        (writebuf_off / stripe_size + 1) * \
-                                       stripe_size - writebuf_off);\
+                                            stripe_size - writebuf_off); \
         write_sz = ADIOI_MIN(req_len, writebuf_len); \
-        memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\
+        memcpy(writebuf, (char *)buf + userbuf_off, write_sz);          \
     } \
 }
 
@@ -129,23 +151,22 @@
 {
     /* offset is in units of etype relative to the filetype. */
     ADIOI_Flatlist_node *flat_buf, *flat_file;
-    int i, j, k, bwr_size, fwr_size = 0, st_index = 0;
-    int bufsize, num, size, sum, n_etypes_in_filetype, size_in_filetype;
-    int n_filetypes, etype_in_filetype;
-    ADIO_Offset abs_off_in_filetype = 0;
-    int filetype_size, etype_size, buftype_size, req_len;
+    ADIO_Offset i_offset, sum, size_in_filetype;
+    int i, j, k, st_index=0;
+    int n_etypes_in_filetype;
+    ADIO_Offset num, size, n_filetypes, etype_in_filetype, st_n_filetypes;
+    ADIO_Offset abs_off_in_filetype=0;
+    int filetype_size, etype_size, buftype_size;
     MPI_Aint filetype_extent, buftype_extent;
     int buf_count, buftype_is_contig, filetype_is_contig;
     ADIO_Offset userbuf_off;
-    ADIO_Offset off, req_off, disp, end_offset = 0, writebuf_off, start_off;
+    ADIO_Offset off, req_off, disp, end_offset=0, writebuf_off, start_off;
     char *writebuf;
-    int flag, st_fwr_size, st_n_filetypes, writebuf_len, write_sz;
+    unsigned bufsize, writebuf_len, write_sz;
     ADIO_Status status1;
-    int new_bwr_size, new_fwr_size;
+    ADIO_Offset new_bwr_size, new_fwr_size, st_fwr_size, fwr_size=0, bwr_size, req_len;
     int stripe_size;
     static char myname[] = "ADIOI_LUSTRE_WriteStrided";
-    int myrank;
-    MPI_Comm_rank(fd->comm, &myrank);
 
     if (fd->hints->ds_write == ADIOI_HINT_DISABLE) {
 	/* if user has disabled data sieving on writes, use naive
@@ -176,6 +197,7 @@
     MPI_Type_extent(datatype, &buftype_extent);
     etype_size = fd->etype_size;
 
+    ADIOI_Assert((buftype_size * count) == ((ADIO_Offset)(unsigned)buftype_size * (ADIO_Offset)count));
     bufsize = buftype_size * count;
 
     /* get striping info */
@@ -190,16 +212,14 @@
 	    flat_buf = flat_buf->next;
 
 	off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
-	    fd->disp + etype_size * offset;
+            fd->disp + (ADIO_Offset)etype_size * offset;
 
 	start_off = off;
 	end_offset = start_off + bufsize - 1;
-	writebuf_off = start_off;
         /* write stripe size buffer each time */
 	writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
-        writebuf_len = (int) ADIOI_MIN(bufsize,
-                                       (writebuf_off / stripe_size + 1) *
-                                       stripe_size - writebuf_off);
+        writebuf_off = 0;
+        writebuf_len = 0;
 
         /* if atomicity is true, lock the region to be accessed */
 	if (fd->atomicity)
@@ -207,7 +227,8 @@
 
 	for (j = 0; j < count; j++) {
 	    for (i = 0; i < flat_buf->count; i++) {
-		userbuf_off = j * buftype_extent + flat_buf->indices[i];
+                userbuf_off = (ADIO_Offset)j * (ADIO_Offset)buftype_extent +
+                    flat_buf->indices[i];
 		req_off = off;
 		req_len = flat_buf->blocklens[i];
 		ADIOI_BUFFERED_WRITE_WITHOUT_READ
@@ -238,30 +259,36 @@
 	disp = fd->disp;
 
 	if (file_ptr_type == ADIO_INDIVIDUAL) {
-	    offset = fd->fp_ind;	/* in bytes */
-	    n_filetypes = -1;
-	    flag = 0;
-	    while (!flag) {
-		n_filetypes++;
-		for (i = 0; i < flat_file->count; i++) {
-		    if (disp + flat_file->indices[i] +
-			(ADIO_Offset) n_filetypes * filetype_extent +
-			flat_file->blocklens[i]	>= offset) {
-			st_index = i;
-			fwr_size = (int) (disp + flat_file->indices[i] +
-					  (ADIO_Offset) n_filetypes *
-					  filetype_extent +
-					  flat_file->blocklens[i] -
-					  offset);
-			flag = 1;
+            /* Wei-keng reworked type processing to be a bit more efficient */
+            offset       = fd->fp_ind - disp;
+            n_filetypes  = (offset - flat_file->indices[0]) / filetype_extent;
+            offset      -= (ADIO_Offset)n_filetypes * filetype_extent;
+            /* now offset is local to this extent */
+
+            /* find the block where offset is located, skip blocklens[i]==0 */
+            for (i=0; i<flat_file->count; i++) {
+                ADIO_Offset dist;
+                if (flat_file->blocklens[i] == 0) continue;
+                dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
+                /* fwr_size is from offset to the end of block i */
+                if (dist == 0) {
+                    i++;
+                    offset   = flat_file->indices[i];
+                    fwr_size = flat_file->blocklens[i];
 			break;
 		    }
+                if (dist > 0) {
+                    fwr_size = dist;
+                    break;
 		}
 	    }
-	} else {
-	    n_etypes_in_filetype = filetype_size / etype_size;
-	    n_filetypes = (int) (offset / n_etypes_in_filetype);
-	    etype_in_filetype = (int) (offset % n_etypes_in_filetype);
+            st_index = i;  /* starting index in flat_file->indices[] */
+            offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
+        }
+        else {
+            n_etypes_in_filetype = filetype_size/etype_size;
+            n_filetypes = offset / n_etypes_in_filetype;
+            etype_in_filetype = offset % n_etypes_in_filetype;
 	    size_in_filetype = etype_in_filetype * etype_size;
 
 	    sum = 0;
@@ -283,8 +310,11 @@
 
 	start_off = offset;
 
-	/* If the file bytes is actually contiguous, we do not need data sieve at all */
-	if (bufsize <= fwr_size) {
+        /* Wei-keng Liao:write request is within single flat_file
+         * contig block*/
+        /* this could happen, for example, with subarray types that are
+         * actually fairly contiguous */
+        if (buftype_is_contig && bufsize <= fwr_size) {
             req_off = start_off;
             req_len = bufsize;
             end_offset = start_off + bufsize - 1;
@@ -294,90 +324,122 @@
             writebuf_len = 0;
             userbuf_off = 0;
             ADIOI_BUFFERED_WRITE_WITHOUT_READ
-	} else {
+            /* write the buffer out finally */
+            ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
+                             ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
+                             error_code);
+
+            if (file_ptr_type == ADIO_INDIVIDUAL) {
+                /* update MPI-IO file pointer to point to the first byte
+                 * that can be accessed in the fileview. */
+                fd->fp_ind = offset + bufsize;
+                if (bufsize == fwr_size) {
+                    do {
+                        st_index++;
+                        if (st_index == flat_file->count) {
+                            st_index = 0;
+                            n_filetypes++;
+                        }
+                    } while (flat_file->blocklens[st_index] == 0);
+                    fd->fp_ind = disp + flat_file->indices[st_index]
+                        + (ADIO_Offset)n_filetypes*filetype_extent;
+                }
+            }
+            fd->fp_sys_posn = -1;   /* set it to null. */
+#ifdef HAVE_STATUS_SET_BYTES
+            MPIR_Status_set_bytes(status, datatype, bufsize);
+#endif
+            ADIOI_Free(writebuf);
+            return;
+        }
+
 	    /* Calculate end_offset, the last byte-offset that will be accessed.
-	       e.g., if start_offset=0 and 100 bytes to be write, end_offset=99 */
+           e.g., if start_offset=0 and 100 bytes to be write, end_offset=99*/
+
 	    st_fwr_size = fwr_size;
 	    st_n_filetypes = n_filetypes;
-	    i = 0;
+        i_offset = 0;
 	    j = st_index;
 	    off = offset;
 	    fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
-	    while (i < bufsize) {
-		i += fwr_size;
+        while (i_offset < bufsize) {
+            i_offset += fwr_size;
 		end_offset = off + fwr_size - 1;
 
-		if (j < (flat_file->count - 1))
-		    j++;
-		else {
-		    j = 0;
-		    n_filetypes++;
+            j = (j+1) % flat_file->count;
+            n_filetypes += (j == 0) ? 1 : 0;
+            while (flat_file->blocklens[j]==0) {
+                j = (j+1) % flat_file->count;
+                n_filetypes += (j == 0) ? 1 : 0;
 		}
 
 		off = disp + flat_file->indices[j] +
-		      (ADIO_Offset) n_filetypes * filetype_extent;
-		fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize - i);
+                n_filetypes*(ADIO_Offset)filetype_extent;
+            fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
 	    }
 
+/* if atomicity is true, lock the region to be accessed */
+        if (fd->atomicity)
+            ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
+
 	    writebuf_off = 0;
 	    writebuf_len = 0;
 	    writebuf = (char *) ADIOI_Malloc(stripe_size);
 	    memset(writebuf, -1, stripe_size);
-	    /* if atomicity is true, lock the region to be accessed */
-	    if (fd->atomicity)
-		ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize);
 
 	    if (buftype_is_contig && !filetype_is_contig) {
-		/* contiguous in memory, noncontiguous in file. should be the most
+
+/* contiguous in memory, noncontiguous in file. should be the most
 		   common case. */
-		i = 0;
+
+            i_offset = 0;
 		j = st_index;
 		off = offset;
 		n_filetypes = st_n_filetypes;
 		fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
-		while (i < bufsize) {
+            while (i_offset < bufsize) {
 		    if (fwr_size) {
 			/* TYPE_UB and TYPE_LB can result in
 			   fwr_size = 0. save system call in such cases */
-			/*
-                        lseek(fd->fd_sys, off, SEEK_SET);
-			err = write(fd->fd_sys, ((char *) buf) + i, fwr_size);
-                        */
+                    /* lseek(fd->fd_sys, off, SEEK_SET);
+                       err = write(fd->fd_sys, ((char *) buf) + i_offset, fwr_size);*/
+
 			req_off = off;
 			req_len = fwr_size;
-			userbuf_off = i;
+                    userbuf_off = i_offset;
 			ADIOI_BUFFERED_WRITE
                     }
-		    i += fwr_size;
+                i_offset += fwr_size;
 
 		    if (off + fwr_size < disp + flat_file->indices[j] +
 		                         flat_file->blocklens[j] +
-			    (ADIO_Offset) n_filetypes * filetype_extent)
+                    n_filetypes*(ADIO_Offset)filetype_extent)
 		        off += fwr_size;
 		    /* did not reach end of contiguous block in filetype.
 		    no more I/O needed. off is incremented by fwr_size. */
 		    else {
-		        if (j < (flat_file->count - 1))
-			    j++;
-			else {
-			    j = 0;
-			    n_filetypes++;
+                    j = (j+1) % flat_file->count;
+                    n_filetypes += (j == 0) ? 1 : 0;
+                    while (flat_file->blocklens[j]==0) {
+                        j = (j+1) % flat_file->count;
+                        n_filetypes += (j == 0) ? 1 : 0;
 			}
 			off = disp + flat_file->indices[j] +
-		              (ADIO_Offset) n_filetypes * filetype_extent;
+                        n_filetypes*(ADIO_Offset)filetype_extent;
 			fwr_size = ADIOI_MIN(flat_file->blocklens[j],
-                                             bufsize - i);
+                                         bufsize-i_offset);
 		    }
 		}
-	    } else {
-		    /* noncontiguous in memory as well as in file */
+        }
+        else {
+/* noncontiguous in memory as well as in file */
+
 	        ADIOI_Flatten_datatype(datatype);
 	        flat_buf = ADIOI_Flatlist;
-	        while (flat_buf->type != datatype)
-		    flat_buf = flat_buf->next;
+            while (flat_buf->type != datatype) flat_buf = flat_buf->next;
 
 		k = num = buf_count = 0;
-		i = (int) (flat_buf->indices[0]);
+            i_offset = flat_buf->indices[0];
 		j = st_index;
 		off = offset;
 		n_filetypes = st_n_filetypes;
@@ -387,13 +449,12 @@
 		while (num < bufsize) {
 		    size = ADIOI_MIN(fwr_size, bwr_size);
 		    if (size) {
-		        /*
-                        lseek(fd->fd_sys, off, SEEK_SET);
-		         err = write(fd->fd_sys, ((char *) buf) + i, size);
-                        */
+                    /* lseek(fd->fd_sys, off, SEEK_SET);
+                       err = write(fd->fd_sys, ((char *) buf) + i_offset, size); */
+
 		        req_off = off;
 		        req_len = size;
-		        userbuf_off = i;
+                    userbuf_off = i_offset;
 		        ADIOI_BUFFERED_WRITE
                     }
 
@@ -401,28 +462,32 @@
 		    new_bwr_size = bwr_size;
 
 		    if (size == fwr_size) {
-		        /* reached end of contiguous block in file */
-		        if (j < (flat_file->count - 1)) {
-			    j++;
-                        } else {
-			    j = 0;
-			    n_filetypes++;
+/* reached end of contiguous block in file */
+                    j = (j+1) % flat_file->count;
+                    n_filetypes += (j == 0) ? 1 : 0;
+                    while (flat_file->blocklens[j]==0) {
+                        j = (j+1) % flat_file->count;
+                        n_filetypes += (j == 0) ? 1 : 0;
 			}
+
 			off = disp + flat_file->indices[j] +
-			      (ADIO_Offset) n_filetypes * filetype_extent;
+                        n_filetypes*(ADIO_Offset)filetype_extent;
+
                         new_fwr_size = flat_file->blocklens[j];
 			if (size != bwr_size) {
-			    i += size;
+                        i_offset += size;
 			    new_bwr_size -= size;
 			}
 		    }
+
 		    if (size == bwr_size) {
-		        /* reached end of contiguous block in memory */
-		        k = (k + 1) % flat_buf->count;
+/* reached end of contiguous block in memory */
+
+                    k = (k + 1)%flat_buf->count;
 		        buf_count++;
-		        i = (int) (buftype_extent *
-                                  (buf_count / flat_buf->count) +
-				  flat_buf->indices[k]);
+                    i_offset = (ADIO_Offset)buftype_extent *
+                        (ADIO_Offset)(buf_count/flat_buf->count) +
+                        flat_buf->indices[k];
 			new_bwr_size = flat_buf->blocklens[k];
 			if (size != fwr_size) {
 			    off += size;
@@ -437,28 +502,26 @@
 
 	    /* write the buffer out finally */
 	    if (writebuf_len) {
-	        ADIO_WriteContig(fd, writebuf, writebuf_len,
-	                         MPI_BYTE, ADIO_EXPLICIT_OFFSET,
+            ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
+                             ADIO_EXPLICIT_OFFSET,
 	                         writebuf_off, &status1, error_code);
 		if (!(fd->atomicity))
 		    ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len);
-		if (*error_code != MPI_SUCCESS) {
-                    ADIOI_Free(writebuf);
-		    return;
-                }
+            if (*error_code != MPI_SUCCESS) return;
 	    }
 	    if (fd->atomicity)
-	        ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize);
-	}
+            ADIOI_UNLOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
+
         ADIOI_Free(writebuf);
-	if (file_ptr_type == ADIO_INDIVIDUAL)
-	    fd->fp_ind = off;
+
+        if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
     }
+
     fd->fp_sys_posn = -1;	/* set it to null. */
 
 #ifdef HAVE_STATUS_SET_BYTES
     MPIR_Status_set_bytes(status, datatype, bufsize);
-    /* This is a temporary way of filling in status. The right way is to
+/* This is a temporary way of filling in status. The right way is to
     keep track of how much data was actually written by ADIOI_BUFFERED_WRITE. */
 #endif
 
diff -ubr mpich2-1.2.1/src/mpi/romio/adio/common/ad_fstype.c mpich2-1.2.1-new/src/mpi/romio/adio/common/ad_fstype.c
--- mpich2-1.2.1/src/mpi/romio/adio/common/ad_fstype.c	2009-06-01 16:57:33.000000000 -0600
+++ mpich2-1.2.1-new/src/mpi/romio/adio/common/ad_fstype.c	2009-12-29 13:20:21.000000000 -0700
@@ -346,18 +346,13 @@
     }
 # endif
 
-/*#if defined(LINUX) && defined(ROMIO_LUSTRE)*/
-#if 0
-    /* disable lustre auto-detection until we figure out why collective i/o
-     * broken */
-#ifdef ROMIO_LUSTRE
-#define LL_SUPER_MAGIC 0x0BD00BD0
+# ifdef ROMIO_LUSTRE
+#  define LL_SUPER_MAGIC 0x0BD00BD0
     if (fsbuf.f_type == LL_SUPER_MAGIC) {
 	*fstype = ADIO_LUSTRE;
 	return;
     }
 # endif
-#endif
 
 # ifdef PAN_KERNEL_FS_CLIENT_SUPER_MAGIC
     if (fsbuf.f_type == PAN_KERNEL_FS_CLIENT_SUPER_MAGIC) {
_______________________________________________
Lustre-discuss mailing list
Lustre-discuss@lists.lustre.org
http://lists.lustre.org/mailman/listinfo/lustre-discuss

Reply via email to