I just spent a little time creating a new "ibv" module for NetPIPE that runs on top of the userspace verbs I've been developing on the roland-uverbs branch. This is pretty much a straight port of the current Mellanox VAPI "ib" module, with the main changes coming from the fact that OpenIB doesn't support the non-standard "unsignaled receive" extension, and the fact that a completion event thread is no longer created automatically.
I found several bugs in the verbs support while making this work, but it seems quite stable now, although I haven't tried all option combinations. I also have not had a chance to compare Mellanox VAPI and OpenIB verbs performance on identical hardware -- it would be very useful to see this comparison on a variety of systems. The new ibv module is contained in the patch included below. Thanks, Roland --- NetPIPE_3.6.2.orig/makefile 2004-06-09 12:46:35.000000000 -0700 +++ NetPIPE_3.6.2/makefile 2005-03-15 13:58:08.000000000 -0800 @@ -229,6 +229,10 @@ -DINFINIBAND -DTCP -I $(VAPI_INC) -L $(VAPI_LIB) \ -lmpga -lvapi -lpthread +ibv: $(SRC)/ibv.c $(SRC)/netpipe.c $(SRC)/netpipe.h + $(CC) $(CFLAGS) $(SRC)/ibv.c $(SRC)/netpipe.c -o NPibv \ + -DOPENIB -DTCP -libverbs + atoll: $(SRC)/atoll.c $(SRC)/netpipe.c $(SRC)/netpipe.h $(CC) $(CFLAGS) -DATOLL $(SRC)/netpipe.c \ $(SRC)/atoll.c -o NPatoll \ --- /dev/null 1970-01-01 00:00:00.000000000 +0000 +++ NetPIPE_3.6.2/src/ibv.c 2005-03-15 13:30:03.000000000 -0800 @@ -0,0 +1,1072 @@ +/*****************************************************************************/ +/* "NetPIPE" -- Network Protocol Independent Performance Evaluator. */ +/* Copyright 1997, 1998 Iowa State University Research Foundation, Inc. */ +/* */ +/* This program is free software; you can redistribute it and/or modify */ +/* it under the terms of the GNU General Public License as published by */ +/* the Free Software Foundation. You should have received a copy of the */ +/* GNU General Public License along with this program; if not, write to the */ +/* Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ +/* */ +/* ibv.c ---- Infiniband module for OpenIB verbs */ +/*****************************************************************************/ + +#define USE_VOLATILE_RPTR /* needed for polling on last byte of recv buffer */ +#include "netpipe.h" +#include <stdio.h> +#include <getopt.h> +#include <pthread.h> + +/* Debugging output macro */ + +FILE* logfile; + +#if 0 +#define LOGPRINTF(_format, _aa...) fprintf(logfile, "%s: " _format, __func__ , ##_aa); fflush(logfile) +#else +#define LOGPRINTF(_format, _aa...) +#endif + +/* Header files needed for Infiniband */ + +#include <infiniband/verbs.h> + +/* Global vars */ + +static struct ibv_device *hca; +static struct ibv_context *ctx; +static struct ibv_port_attr hca_port; +static int port_num; +static uint16_t lid; +static uint16_t d_lid; +static struct ibv_pd *pd_hndl; +static int num_cqe; +static int act_num_cqe; +static struct ibv_cq *s_cq_hndl; +static struct ibv_cq *r_cq_hndl; +static struct ibv_mr *s_mr_hndl; +static struct ibv_mr *r_mr_hndl; +static struct ibv_qp_init_attr qp_init_attr; +static struct ibv_qp *qp_hndl; +static uint32_t d_qp_num; +static struct ibv_qp_attr qp_attr; +static struct ibv_wc wc; +static int max_wq=50000; +static void* remote_address; +static uint32_t remote_key; +static volatile int receive_complete; +static pthread_t thread; + +/* Function definitions */ + +void Init(ArgStruct *p, int* pargc, char*** pargv) +{ + /* Set defaults + */ + p->prot.ib_mtu = IBV_MTU_1024; /* 1024 Byte MTU */ + p->prot.commtype = NP_COMM_RDMAWRITE; /* Use RDMA write communications */ + p->prot.comptype = NP_COMP_LOCALPOLL; /* Use local polling for completion */ + p->tr = 0; /* I am not the transmitter */ + p->rcv = 1; /* I am the receiver */ +} + +void Setup(ArgStruct *p) +{ + + int one = 1; + int sockfd; + struct sockaddr_in *lsin1, *lsin2; /* ptr to sockaddr_in in ArgStruct */ + char *host; + struct hostent *addr; + struct protoent *proto; + int send_size, recv_size, sizeofint = sizeof(int); + struct sigaction sigact1; + char logfilename[80]; + + /* Sanity check */ + if( p->prot.commtype == NP_COMM_RDMAWRITE && + p->prot.comptype != NP_COMP_LOCALPOLL ) { + fprintf(stderr, "Error, RDMA Write may only be used with local polling.\n"); + fprintf(stderr, "Try using RDMA Write With Immediate Data with vapi polling\n"); + fprintf(stderr, "or event completion\n"); + exit(-1); + } + + if( p->prot.commtype != NP_COMM_RDMAWRITE && + p->prot.comptype == NP_COMP_LOCALPOLL ) { + fprintf(stderr, "Error, local polling may only be used with RDMA Write.\n"); + fprintf(stderr, "Try using vapi polling or event completion\n"); + exit(-1); + } + + /* Open log file */ + sprintf(logfilename, ".iblog%d", 1 - p->tr); + logfile = fopen(logfilename, "w"); + + host = p->host; /* copy ptr to hostname */ + + lsin1 = &(p->prot.sin1); + lsin2 = &(p->prot.sin2); + + bzero((char *) lsin1, sizeof(*lsin1)); + bzero((char *) lsin2, sizeof(*lsin2)); + + if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + printf("NetPIPE: can't open stream socket! errno=%d\n", errno); + exit(-4); + } + + if(!(proto = getprotobyname("tcp"))){ + printf("NetPIPE: protocol 'tcp' unknown!\n"); + exit(555); + } + + if (p->tr){ /* if client i.e., Sender */ + + + if (atoi(host) > 0) { /* Numerical IP address */ + lsin1->sin_family = AF_INET; + lsin1->sin_addr.s_addr = inet_addr(host); + + } else { + + if ((addr = gethostbyname(host)) == NULL){ + printf("NetPIPE: invalid hostname '%s'\n", host); + exit(-5); + } + + lsin1->sin_family = addr->h_addrtype; + bcopy(addr->h_addr, (char*) &(lsin1->sin_addr.s_addr), addr->h_length); + } + + lsin1->sin_port = htons(p->port); + + } else { /* we are the receiver (server) */ + + bzero((char *) lsin1, sizeof(*lsin1)); + lsin1->sin_family = AF_INET; + lsin1->sin_addr.s_addr = htonl(INADDR_ANY); + lsin1->sin_port = htons(p->port); + + if (bind(sockfd, (struct sockaddr *) lsin1, sizeof(*lsin1)) < 0){ + printf("NetPIPE: server: bind on local address failed! errno=%d", errno); + exit(-6); + } + + } + + if(p->tr) + p->commfd = sockfd; + else + p->servicefd = sockfd; + + + + /* Establish tcp connections */ + + establish(p); + + /* Initialize Mellanox Infiniband */ + + if(initIB(p) == -1) { + CleanUp(p); + exit(-1); + } +} + +void event_handler(struct ibv_cq *cq); + +void *EventThread(void *unused) +{ + struct ibv_cq *cq; + void *data; + + while (1) { + if (ibv_get_cq_event(ctx, 0, &cq, &data)) { + fprintf(stderr, "Failed to get CQ event\n"); + return NULL; + } + event_handler(cq); + } +} + +int initIB(ArgStruct *p) +{ + struct dlist *dev_list; + int ret; + + dev_list = ibv_get_devices(); + dlist_start(dev_list); + hca = dlist_next(dev_list); + if (!hca) { + fprintf(stderr, "Couldn't find any InfiniBand devices\n"); + return -1; + } else { + LOGPRINTF("Found Infiniband HCA %s\n", ibv_get_device_name(hca)); + } + + ctx = ibv_open_device(hca); + if (!ctx) { + fprintf(stderr, "Couldn't create InfiniBand context\n"); + return -1; + } else { + LOGPRINTF("Found Infiniband HCA %s\n", ibv_get_device_name(hca)); + } + + /* Get HCA properties */ + + port_num=1; + ret = ibv_query_port(ctx, port_num, &hca_port); + if(ret) { + fprintf(stderr, "Error querying Infiniband HCA\n"); + return -1; + } else { + LOGPRINTF("Queried Infiniband HCA\n"); + } + lid = hca_port.lid; + LOGPRINTF(" lid = %d\n", lid); + + + /* Allocate Protection Domain */ + + pd_hndl = ibv_alloc_pd(ctx); + if(!pd_hndl) { + fprintf(stderr, "Error allocating PD\n"); + return -1; + } else { + LOGPRINTF("Allocated Protection Domain\n"); + } + + + /* Create send completion queue */ + + num_cqe = 30000; /* Requested number of completion q elements */ + s_cq_hndl = ibv_create_cq(ctx, num_cqe, NULL); + if(!s_cq_hndl) { + fprintf(stderr, "Error creating send CQ\n"); + return -1; + } else { + act_num_cqe = s_cq_hndl->cqe; + LOGPRINTF("Created Send Completion Queue with %d elements\n", act_num_cqe); + } + + + /* Create recv completion queue */ + + num_cqe = 20000; /* Requested number of completion q elements */ + r_cq_hndl = ibv_create_cq(ctx, num_cqe, NULL); + if(!r_cq_hndl) { + fprintf(stderr, "Error creating send CQ\n"); + return -1; + } else { + act_num_cqe = r_cq_hndl->cqe; + LOGPRINTF("Created Recv Completion Queue with %d elements\n", act_num_cqe); + } + + + /* Placeholder for MR */ + + + /* Create Queue Pair */ + + qp_init_attr.cap.max_recv_wr = max_wq; /* Max outstanding WR on RQ */ + qp_init_attr.cap.max_send_wr = max_wq; /* Max outstanding WR on SQ */ + qp_init_attr.cap.max_recv_sge = 1; /* Max scatter/gather entries on RQ */ + qp_init_attr.cap.max_send_sge = 1; /* Max scatter/gather entries on SQ */ + qp_init_attr.recv_cq = r_cq_hndl; /* CQ handle for RQ */ + qp_init_attr.send_cq = s_cq_hndl; /* CQ handle for SQ */ + qp_init_attr.sq_sig_all = 0; /* Signalling type */ + qp_init_attr.qp_type = IBV_QPT_RC; /* Transmission type */ + + qp_hndl = ibv_create_qp(pd_hndl, &qp_init_attr); + if(!qp_hndl) { + fprintf(stderr, "Error creating Queue Pair\n"); + return -1; + } else { + LOGPRINTF("Created Queue Pair\n"); + } + + + /* Exchange lid and qp_num with other node */ + + if( write(p->commfd, &lid, sizeof(lid) ) != sizeof(lid) ) { + fprintf(stderr, "Failed to send lid over socket\n"); + return -1; + } + if( write(p->commfd, &qp_hndl->qp_num, sizeof(qp_hndl->qp_num) ) != sizeof(qp_hndl->qp_num) ) { + fprintf(stderr, "Failed to send qpnum over socket\n"); + return -1; + } + if( read(p->commfd, &d_lid, sizeof(d_lid) ) != sizeof(d_lid) ) { + fprintf(stderr, "Failed to read lid from socket\n"); + return -1; + } + if( read(p->commfd, &d_qp_num, sizeof(d_qp_num) ) != sizeof(d_qp_num) ) { + fprintf(stderr, "Failed to read qpnum from socket\n"); + return -1; + } + + LOGPRINTF("Local: lid=%d qp_num=%d Remote: lid=%d qp_num=%d\n", + lid, qp_hndl->qp_num, d_lid, d_qp_num); + + + /* Bring up Queue Pair */ + + /******* INIT state ******/ + + qp_attr.qp_state = IBV_QPS_INIT; + qp_attr.pkey_index = 0; + qp_attr.port_num = port_num; + qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; + + ret = ibv_modify_qp(qp_hndl, &qp_attr, + IBV_QP_STATE | + IBV_QP_PKEY_INDEX | + IBV_QP_PORT | + IBV_QP_ACCESS_FLAGS); + if(ret) { + fprintf(stderr, "Error modifying QP to INIT\n"); + return -1; + } + + LOGPRINTF("Modified QP to INIT\n"); + + /******* RTR (Ready-To-Receive) state *******/ + + qp_attr.qp_state = IBV_QPS_RTR; + qp_attr.max_dest_rd_atomic = 1; + qp_attr.dest_qp_num = d_qp_num; + qp_attr.ah_attr.sl = 0; + qp_attr.ah_attr.is_global = 0; + qp_attr.ah_attr.dlid = d_lid; + qp_attr.ah_attr.static_rate = 0; + qp_attr.ah_attr.src_path_bits = 0; + qp_attr.ah_attr.port_num = port_num; + qp_attr.path_mtu = p->prot.ib_mtu; + qp_attr.rq_psn = 0; + qp_attr.pkey_index = 0; + qp_attr.min_rnr_timer = 5; + + ret = ibv_modify_qp(qp_hndl, &qp_attr, + IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MAX_DEST_RD_ATOMIC | + IBV_QP_MIN_RNR_TIMER); + + if(ret) { + fprintf(stderr, "Error modifying QP to RTR\n"); + return -1; + } + + LOGPRINTF("Modified QP to RTR\n"); + + /* Sync before going to RTS state */ + Sync(p); + + /******* RTS (Ready-to-Send) state *******/ + + qp_attr.qp_state = IBV_QPS_RTS; + qp_attr.sq_psn = 0; + qp_attr.timeout = 31; + qp_attr.retry_cnt = 1; + qp_attr.rnr_retry = 1; + qp_attr.max_rd_atomic = 1; + + ret = ibv_modify_qp(qp_hndl, &qp_attr, + IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC); + + if(ret) { + fprintf(stderr, "Error modifying QP to RTS\n"); + return -1; + } + + LOGPRINTF("Modified QP to RTS\n"); + + /* If using event completion, request the initial notification */ + if( p->prot.comptype == NP_COMP_EVENT ) { + if (pthread_create(&thread, NULL, EventThread, NULL)) { + fprintf(stderr, "Couldn't start event thread\n"); + return -1; + } + ibv_req_notify_cq(r_cq_hndl, 0); + } + + return 0; +} + +int finalizeIB(ArgStruct *p) +{ + int ret; + + LOGPRINTF("Finalizing IB stuff\n"); + + if(qp_hndl) { + LOGPRINTF("Destroying QP\n"); + ret = ibv_destroy_qp(qp_hndl); + if(ret) { + fprintf(stderr, "Error destroying Queue Pair\n"); + } + } + + if(r_cq_hndl) { + LOGPRINTF("Destroying Recv CQ\n"); + ret = ibv_destroy_cq(r_cq_hndl); + if(ret) { + fprintf(stderr, "Error destroying recv CQ\n"); + } + } + + if(s_cq_hndl) { + LOGPRINTF("Destroying Send CQ\n"); + ret = ibv_destroy_cq(s_cq_hndl); + if(ret) { + fprintf(stderr, "Error destroying send CQ\n"); + } + } + + /* Check memory registrations just in case user bailed out */ + if(s_mr_hndl) { + LOGPRINTF("Deregistering send buffer\n"); + ret = ibv_dereg_mr(s_mr_hndl); + if(ret) { + fprintf(stderr, "Error deregistering send mr\n"); + } + } + + if(r_mr_hndl) { + LOGPRINTF("Deregistering recv buffer\n"); + ret = ibv_dereg_mr(r_mr_hndl); + if(ret) { + fprintf(stderr, "Error deregistering recv mr\n"); + } + } + + if(pd_hndl) { + LOGPRINTF("Deallocating PD\n"); + ret = ibv_dealloc_pd(pd_hndl); + if(ret) { + fprintf(stderr, "Error deallocating PD\n"); + } + } + + /* Application code should not close HCA, just release handle */ + + if(ctx) { + LOGPRINTF("Releasing HCA\n"); + ret = ibv_close_device(ctx); + if(ret) { + fprintf(stderr, "Error releasing HCA\n"); + } + } + + return 0; +} + +void event_handler(struct ibv_cq *cq) +{ + int ret; + + while(1) { + + ret = ibv_poll_cq(cq, 1, &wc); + + if(ret == 0) { + LOGPRINTF("Empty completion queue, requesting next notification\n"); + ibv_req_notify_cq(r_cq_hndl, 0); + return; + } else if(ret < 0) { + fprintf(stderr, "Error in event_handler, polling cq\n"); + exit(-1); + } else if(wc.status != IBV_WC_SUCCESS) { + fprintf(stderr, "Error in event_handler, on returned work completion " + "status: %d\n", wc.status); + exit(-1); + } + + LOGPRINTF("Retrieved work completion\n"); + + /* For ping-pong mode at least, this check shouldn't be needed for + * normal operation, but it will help catch any bugs with multiple + * sends coming through when we're only expecting one. + */ + if(receive_complete == 1) { + + while(receive_complete != 0) sched_yield(); + + } + + receive_complete = 1; + + } + +} + +static int +readFully(int fd, void *obuf, int len) +{ + int bytesLeft = len; + char *buf = (char *) obuf; + int bytesRead = 0; + + while (bytesLeft > 0 && + (bytesRead = read(fd, (void *) buf, bytesLeft)) > 0) + { + bytesLeft -= bytesRead; + buf += bytesRead; + } + if (bytesRead <= 0) + return bytesRead; + return len; +} + +void Sync(ArgStruct *p) +{ + char s[] = "SyncMe"; + char response[7]; + + if (write(p->commfd, s, strlen(s)) < 0 || + readFully(p->commfd, response, strlen(s)) < 0) + { + perror("NetPIPE: error writing or reading synchronization string"); + exit(3); + } + if (strncmp(s, response, strlen(s))) + { + fprintf(stderr, "NetPIPE: Synchronization string incorrect!\n"); + exit(3); + } +} + +void PrepareToReceive(ArgStruct *p) +{ + int ret; /* Return code */ + struct ibv_recv_wr rr; /* Receive request */ + struct ibv_recv_wr *bad_wr; + struct ibv_sge sg_entry; /* Scatter/Gather list - holds buff addr */ + + /* We don't need to post a receive if doing RDMA write with local polling */ + + if( p->prot.commtype == NP_COMM_RDMAWRITE && + p->prot.comptype == NP_COMP_LOCALPOLL ) + return; + + rr.num_sge = 1; + rr.sg_list = &sg_entry; + rr.next = NULL; + + sg_entry.lkey = r_mr_hndl->lkey; + sg_entry.length = p->bufflen; + sg_entry.addr = (uintptr_t)p->r_ptr; + + ret = ibv_post_recv(qp_hndl, &rr, &bad_wr); + if(ret) { + fprintf(stderr, "Error posting recv request\n"); + CleanUp(p); + exit(-1); + } else { + LOGPRINTF("Posted recv request\n"); + } + + /* Set receive flag to zero and request event completion + * notification for this receive so the event handler will + * be triggered when the receive completes. + */ + if( p->prot.comptype == NP_COMP_EVENT ) { + receive_complete = 0; + } +} + +void SendData(ArgStruct *p) +{ + int ret; /* Return code */ + struct ibv_send_wr sr; /* Send request */ + struct ibv_send_wr *bad_wr; + struct ibv_sge sg_entry; /* Scatter/Gather list - holds buff addr */ + + /* Fill in send request struct */ + + if(p->prot.commtype == NP_COMM_SENDRECV) { + sr.opcode = IBV_WR_SEND; + LOGPRINTF("Doing regular send\n"); + } else if(p->prot.commtype == NP_COMM_SENDRECV_WITH_IMM) { + sr.opcode = IBV_WR_SEND_WITH_IMM; + LOGPRINTF("Doing regular send with imm\n"); + } else if(p->prot.commtype == NP_COMM_RDMAWRITE) { + sr.opcode = IBV_WR_RDMA_WRITE; + sr.wr.rdma.remote_addr = (uintptr_t)(remote_address + (p->s_ptr - p->s_buff)); + sr.wr.rdma.rkey = remote_key; + LOGPRINTF("Doing RDMA write (raddr=%p)\n", sr.wr.rdma.remote_addr); + } else if(p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM) { + sr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + sr.wr.rdma.remote_addr = (uintptr_t)(remote_address + (p->s_ptr - p->s_buff)); + sr.wr.rdma.rkey = remote_key; + LOGPRINTF("Doing RDMA write with imm (raddr=%p)\n", sr.wr.rdma.remote_addr); + } else { + fprintf(stderr, "Error, invalid communication type in SendData\n"); + exit(-1); + } + + sr.send_flags = 0; /* This needed due to a bug in Mellanox HW rel a-0 */ + + sr.num_sge = 1; + sr.sg_list = &sg_entry; + sr.next = NULL; + + sg_entry.lkey = s_mr_hndl->lkey; /* Local memory region key */ + sg_entry.length = p->bufflen; + sg_entry.addr = (uintptr_t)p->s_ptr; + + ret = ibv_post_send(qp_hndl, &sr, &bad_wr); + if(ret) { + fprintf(stderr, "Error posting send request\n"); + } else { + LOGPRINTF("Posted send request\n"); + } + +} + +void RecvData(ArgStruct *p) +{ + int ret; + + /* Busy wait for incoming data */ + + LOGPRINTF("Receiving at buffer address %p\n", p->r_ptr); + + /* + * Unsignaled receives are not supported, so we must always poll the + * CQ, except when using RDMA writes. + */ + if( p->prot.commtype == NP_COMM_RDMAWRITE ) { + + /* Poll for receive completion locally on the receive data */ + + LOGPRINTF("Waiting for last byte of data to arrive\n"); + + while(p->r_ptr[p->bufflen-1] != 'a' + (p->cache ? 1 - p->tr : 1) ) + { + /* BUSY WAIT -- this should be fine since we + * declared r_ptr with volatile qualifier */ + } + + /* Reset last byte */ + p->r_ptr[p->bufflen-1] = 'a' + (p->cache ? p->tr : 0); + + LOGPRINTF("Received all of data\n"); + + } else if( p->prot.comptype != NP_COMP_EVENT ) { + + /* Poll for receive completion using VAPI poll function */ + + LOGPRINTF("Polling completion queue for VAPI work completion\n"); + + ret = 0; + while(ret == 0) + ret = ibv_poll_cq(r_cq_hndl, 1, &wc); + + if(ret < 0) { + fprintf(stderr, "Error in RecvData, polling for completion\n"); + exit(-1); + } + + if(wc.status != IBV_WC_SUCCESS) { + fprintf(stderr, "Error in status of returned completion: %d\n", + wc.status); + exit(-1); + } + + LOGPRINTF("Retrieved successful completion\n"); + + } else if( p->prot.comptype == NP_COMP_EVENT ) { + + /* Instead of polling directly on data or VAPI completion queue, + * let the VAPI event completion handler set a flag when the receive + * completes, and poll on that instead. Could try using semaphore here + * as well to eliminate busy polling + */ + + LOGPRINTF("Polling receive flag\n"); + + while( receive_complete == 0 ) + { + /* BUSY WAIT */ + } + + /* If in prepost-burst mode, we won't be calling PrepareToReceive + * between ping-pongs, so we need to reset the receive_complete + * flag here. + */ + if( p->preburst ) receive_complete = 0; + + LOGPRINTF("Receive completed\n"); + } +} + +/* Reset is used after a trial to empty the work request queues so we + have enough room for the next trial to run */ +void Reset(ArgStruct *p) +{ + + int ret; /* Return code */ + struct ibv_send_wr sr; /* Send request */ + struct ibv_send_wr *bad_sr; + struct ibv_recv_wr rr; /* Recv request */ + struct ibv_recv_wr *bad_rr; + + /* If comptype is event, then we'll use event handler to detect receive, + * so initialize receive_complete flag + */ + if(p->prot.comptype == NP_COMP_EVENT) receive_complete = 0; + + /* Prepost receive */ + rr.num_sge = 0; + rr.next = NULL; + + LOGPRINTF("Posting recv request in Reset\n"); + ret = ibv_post_recv(qp_hndl, &rr, &bad_rr); + if(ret) { + fprintf(stderr, " Error posting recv request\n"); + CleanUp(p); + exit(-1); + } + + /* Make sure both nodes have preposted receives */ + Sync(p); + + /* Post Send */ + sr.opcode = IBV_WR_SEND; + sr.send_flags = IBV_SEND_SIGNALED; + sr.num_sge = 0; + sr.next = NULL; + + LOGPRINTF("Posting send request \n"); + ret = ibv_post_send(qp_hndl, &sr, &bad_sr); + if(ret) { + fprintf(stderr, " Error posting send request in Reset\n"); + exit(-1); + } + if(wc.status != IBV_WC_SUCCESS) { + fprintf(stderr, " Error in completion status: %d\n", + wc.status); + exit(-1); + } + + LOGPRINTF("Polling for completion of send request\n"); + ret = 0; + while(ret == 0) + ret = ibv_poll_cq(s_cq_hndl, 1, &wc); + + if(ret < 0) { + fprintf(stderr, "Error polling CQ for send in Reset\n"); + exit(-1); + } + if(wc.status != IBV_WC_SUCCESS) { + fprintf(stderr, " Error in completion status: %d\n", + wc.status); + exit(-1); + } + + LOGPRINTF("Status of send completion: %d\n", wc.status); + + if(p->prot.comptype == NP_COMP_EVENT) { + /* If using event completion, the event handler will set receive_complete + * when it gets the completion event. + */ + LOGPRINTF("Waiting for receive_complete flag\n"); + while(receive_complete == 0) { /* BUSY WAIT */ } + } else { + LOGPRINTF("Polling for completion of recv request\n"); + ret = 0; + while(ret == 0) + ret = ibv_poll_cq(r_cq_hndl, 1, &wc); + + if(ret < 0) { + fprintf(stderr, "Error polling CQ for recv in Reset"); + exit(-1); + } + if(wc.status != IBV_WC_SUCCESS) { + fprintf(stderr, " Error in completion status: %d\n", + wc.status); + exit(-1); + } + + LOGPRINTF("Status of recv completion: %d\n", wc.status); + } + LOGPRINTF("Done with reset\n"); +} + +void SendTime(ArgStruct *p, double *t) +{ + uint32_t ltime, ntime; + + /* + Multiply the number of seconds by 1e6 to get time in microseconds + and convert value to an unsigned 32-bit integer. + */ + ltime = (uint32_t)(*t * 1.e6); + + /* Send time in network order */ + ntime = htonl(ltime); + if (write(p->commfd, (char *)&ntime, sizeof(uint32_t)) < 0) + { + printf("NetPIPE: write failed in SendTime: errno=%d\n", errno); + exit(301); + } +} + +void RecvTime(ArgStruct *p, double *t) +{ + uint32_t ltime, ntime; + int bytesRead; + + bytesRead = readFully(p->commfd, (void *)&ntime, sizeof(uint32_t)); + if (bytesRead < 0) + { + printf("NetPIPE: read failed in RecvTime: errno=%d\n", errno); + exit(302); + } + else if (bytesRead != sizeof(uint32_t)) + { + fprintf(stderr, "NetPIPE: partial read in RecvTime of %d bytes\n", + bytesRead); + exit(303); + } + ltime = ntohl(ntime); + + /* Result is ltime (in microseconds) divided by 1.0e6 to get seconds */ + *t = (double)ltime / 1.0e6; +} + +void SendRepeat(ArgStruct *p, int rpt) +{ + uint32_t lrpt, nrpt; + + lrpt = rpt; + /* Send repeat count as a long in network order */ + nrpt = htonl(lrpt); + if (write(p->commfd, (void *) &nrpt, sizeof(uint32_t)) < 0) + { + printf("NetPIPE: write failed in SendRepeat: errno=%d\n", errno); + exit(304); + } +} + +void RecvRepeat(ArgStruct *p, int *rpt) +{ + uint32_t lrpt, nrpt; + int bytesRead; + + bytesRead = readFully(p->commfd, (void *)&nrpt, sizeof(uint32_t)); + if (bytesRead < 0) + { + printf("NetPIPE: read failed in RecvRepeat: errno=%d\n", errno); + exit(305); + } + else if (bytesRead != sizeof(uint32_t)) + { + fprintf(stderr, "NetPIPE: partial read in RecvRepeat of %d bytes\n", + bytesRead); + exit(306); + } + lrpt = ntohl(nrpt); + + *rpt = lrpt; +} + +void establish(ArgStruct *p) +{ + int clen; + int one = 1; + struct protoent; + + clen = sizeof(p->prot.sin2); + if(p->tr){ + if(connect(p->commfd, (struct sockaddr *) &(p->prot.sin1), + sizeof(p->prot.sin1)) < 0){ + printf("Client: Cannot Connect! errno=%d\n",errno); + exit(-10); + } + } + else { + /* SERVER */ + listen(p->servicefd, 5); + p->commfd = accept(p->servicefd, (struct sockaddr *) &(p->prot.sin2), + &clen); + + if(p->commfd < 0){ + printf("Server: Accept Failed! errno=%d\n",errno); + exit(-12); + } + } +} + +void CleanUp(ArgStruct *p) +{ + char *quit="QUIT"; + if (p->tr) + { + write(p->commfd,quit, 5); + read(p->commfd, quit, 5); + close(p->commfd); + } + else + { + read(p->commfd,quit, 5); + write(p->commfd,quit,5); + close(p->commfd); + close(p->servicefd); + } + + finalizeIB(p); +} + + +void AfterAlignmentInit(ArgStruct *p) +{ + int bytesRead; + + /* Exchange buffer pointers and remote infiniband keys if doing rdma. Do + * the exchange in this function because this will happen after any + * memory alignment is done, which is important for getting the + * correct remote address. + */ + if( p->prot.commtype == NP_COMM_RDMAWRITE || + p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM ) { + + /* Send my receive buffer address + */ + if(write(p->commfd, (void *)&p->r_buff, sizeof(void*)) < 0) { + perror("NetPIPE: write of buffer address failed in AfterAlignmentInit"); + exit(-1); + } + + LOGPRINTF("Sent buffer address: %p\n", p->r_buff); + + /* Send my remote key for accessing + * my remote buffer via IB RDMA + */ + if(write(p->commfd, (void *)&r_mr_hndl->rkey, sizeof(uint32_t)) < 0) { + perror("NetPIPE: write of remote key failed in AfterAlignmentInit"); + exit(-1); + } + + LOGPRINTF("Sent remote key: %d\n", r_mr_hndl->rkey); + + /* Read the sent data + */ + bytesRead = readFully(p->commfd, (void *)&remote_address, sizeof(void*)); + if (bytesRead < 0) { + perror("NetPIPE: read of buffer address failed in AfterAlignmentInit"); + exit(-1); + } else if (bytesRead != sizeof(void*)) { + perror("NetPIPE: partial read of buffer address in AfterAlignmentInit"); + exit(-1); + } + + LOGPRINTF("Received remote address from other node: %p\n", remote_address); + + bytesRead = readFully(p->commfd, (void *)&remote_key, sizeof(uint32_t)); + if (bytesRead < 0) { + perror("NetPIPE: read of remote key failed in AfterAlignmentInit"); + exit(-1); + } else if (bytesRead != sizeof(uint32_t)) { + perror("NetPIPE: partial read of remote key in AfterAlignmentInit"); + exit(-1); + } + + LOGPRINTF("Received remote key from other node: %d\n", remote_key); + + } +} + + +void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset) +{ + /* Allocate buffers */ + + p->r_buff = malloc(bufflen+MAX(soffset,roffset)); + if(p->r_buff == NULL) { + fprintf(stderr, "Error malloc'ing buffer\n"); + exit(-1); + } + + if(p->cache) { + + /* Infiniband spec says we can register same memory region + * more than once, so just copy buffer address. We will register + * the same buffer twice with Infiniband. + */ + p->s_buff = p->r_buff; + + } else { + + p->s_buff = malloc(bufflen+soffset); + if(p->s_buff == NULL) { + fprintf(stderr, "Error malloc'ing buffer\n"); + exit(-1); + } + + } + + /* Register buffers with Infiniband */ + + r_mr_hndl = ibv_reg_mr(pd_hndl, p->r_buff, bufflen + MAX(soffset, roffset), + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); + if(!r_mr_hndl) + { + fprintf(stderr, "Error registering recv buffer\n"); + exit(-1); + } + else + { + LOGPRINTF("Registered Recv Buffer\n"); + } + + s_mr_hndl = ibv_reg_mr(pd_hndl, p->s_buff, bufflen+soffset, IBV_ACCESS_LOCAL_WRITE); + if(!s_mr_hndl) { + fprintf(stderr, "Error registering send buffer\n"); + exit(-1); + } else { + LOGPRINTF("Registered Send Buffer\n"); + } + +} +void FreeBuff(char *buff1, char *buff2) +{ + int ret; + + if(s_mr_hndl) { + LOGPRINTF("Deregistering send buffer\n"); + ret = ibv_dereg_mr(s_mr_hndl); + if(ret) { + fprintf(stderr, "Error deregistering send mr\n"); + } else { + s_mr_hndl = NULL; + } + } + + if(r_mr_hndl) { + LOGPRINTF("Deregistering recv buffer\n"); + ret = ibv_dereg_mr(r_mr_hndl); + if(ret) { + fprintf(stderr, "Error deregistering recv mr\n"); + } else { + r_mr_hndl = NULL; + } + } + + if(buff1 != NULL) + free(buff1); + + if(buff2 != NULL) + free(buff2); +} + --- NetPIPE_3.6.2.orig/src/netpipe.c 2004-06-22 12:38:41.000000000 -0700 +++ NetPIPE_3.6.2/src/netpipe.c 2005-03-15 12:36:44.000000000 -0800 @@ -142,7 +142,7 @@ case 's': streamopt = 1; printf("Streaming in one direction only.\n\n"); -#if defined(TCP) && ! defined(INFINIBAND) +#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB) printf("Sockets are reset between trials to avoid\n"); printf("degradation from a collapsing window size.\n\n"); #endif @@ -168,7 +168,7 @@ case 'u': end = atoi(optarg); break; -#if defined(TCP) && ! defined(INFINIBAND) +#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB) case 'b': /* -b # resets the buffer size, -b 0 keeps system defs */ args.prot.sndbufsz = args.prot.rcvbufsz = atoi(optarg); break; @@ -178,7 +178,7 @@ /* end will be maxed at sndbufsz+rcvbufsz */ printf("Passing data in both directions simultaneously.\n"); printf("Output is for the combined bandwidth.\n"); -#if defined(TCP) && ! defined(INFINIBAND) +#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB) printf("The socket buffer size limits the maximum test size.\n\n"); #endif if( streamopt ) { @@ -270,7 +270,29 @@ exit(-1); } break; +#endif + +#if defined(OPENIB) + case 'm': switch(atoi(optarg)) { + case 256: args.prot.ib_mtu = IBV_MTU_256; + break; + case 512: args.prot.ib_mtu = IBV_MTU_512; + break; + case 1024: args.prot.ib_mtu = IBV_MTU_1024; + break; + case 2048: args.prot.ib_mtu = IBV_MTU_2048; + break; + case 4096: args.prot.ib_mtu = IBV_MTU_4096; + break; + default: + fprintf(stderr, "Invalid MTU size, must be one of " + "256, 512, 1024, 2048, 4096\n"); + exit(-1); + } + break; +#endif +#if defined(OPENIB) || defined(INFINIBAND) case 't': if( !strcmp(optarg, "send_recv") ) { printf("Using Send/Receive communications\n"); args.prot.commtype = NP_COMM_SENDRECV; @@ -317,7 +339,7 @@ case 'n': nrepeat_const = atoi(optarg); break; -#if defined(TCP) && ! defined(INFINIBAND) +#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB) case 'r': args.reset_conn = 1; printf("Resetting connection after every trial\n"); break; @@ -331,7 +353,7 @@ #endif /* ! defined TCGMSG */ -#if defined(INFINIBAND) +#if defined(OPENIB) || defined(INFINIBAND) asyncReceive = 1; fprintf(stderr, "Preposting asynchronous receives (required for Infiniband)\n"); if(args.bidir && ( @@ -377,7 +399,7 @@ end = args.upper; if( args.tr ) { printf("The upper limit is being set to %d Bytes\n", end); -#if defined(TCP) && ! defined(INFINIBAND) +#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB) printf("due to socket buffer size limitations\n\n"); #endif } } @@ -990,7 +1012,7 @@ void PrintUsage() { printf("\n NETPIPE USAGE \n\n"); -#if ! defined(INFINIBAND) +#if ! defined(INFINIBAND) && !defined(OPENIB) printf("a: asynchronous receive (a.k.a. preposted receive)\n"); #endif printf("B: burst all preposts before measuring performance\n"); @@ -998,7 +1020,7 @@ printf("b: specify TCP send/receive socket buffer sizes\n"); #endif -#if defined(INFINIBAND) +#if defined(INFINIBAND) || defined(OPENIB) printf("c: specify type of completion <-c type>\n" " valid types: local_poll, vapi_poll, event\n" " default: local_poll\n"); @@ -1010,7 +1032,7 @@ printf(" all MPI-2 implementations\n"); #endif -#if defined(TCP) || defined(INFINIBAND) +#if defined(TCP) || defined(INFINIBAND) || defined(OPENIB) printf("h: specify hostname of the receiver <-h host>\n"); #endif @@ -1019,7 +1041,7 @@ printf("i: Do an integrity check instead of measuring performance\n"); printf("l: lower bound start value e.g. <-l 1>\n"); -#if defined(INFINIBAND) +#if defined(INFINIBAND) || defined(OPENIB) printf("m: set MTU for Infiniband adapter <-m mtu_size>\n"); printf(" valid sizes: 256, 512, 1024, 2048, 4096 (default 1024)\n"); #endif @@ -1030,7 +1052,7 @@ printf("p: set the perturbation number <-p 1>\n" " (default = 3 Bytes, set to 0 for no perturbations)\n"); -#if defined(TCP) && ! defined(INFINIBAND) +#if defined(TCP) && ! defined(INFINIBAND) && !defined(OPENIB) printf("r: reset sockets for every trial\n"); #endif @@ -1039,7 +1061,7 @@ printf("S: Use synchronous sends.\n"); #endif -#if defined(INFINIBAND) +#if defined(INFINIBAND) || defined(OPENIB) printf("t: specify type of communications <-t type>\n" " valid types: send_recv, send_recv_with_imm,\n" " rdma_write, rdma_write_with_imm\n" @@ -1056,7 +1078,7 @@ #if defined(MPI) printf(" May need to use -a to choose asynchronous communications for MPI/n"); #endif -#if defined(TCP) && !defined(INFINIBAND) +#if defined(TCP) && !defined(INFINIBAND) && !defined(OPENIB) printf(" The maximum test size is limited by the TCP buffer size/n"); #endif printf("\n"); @@ -1131,7 +1153,7 @@ memset(p->s_buff, 'b', nbytes+soffset); } -#if !defined(INFINIBAND) && !defined(ARMCI) && !defined(LAPI) && !defined(GPSHMEM) && !defined(SHMEM) && !defined(GM) +#if !defined(OPENIB) && !defined(INFINIBAND) && !defined(ARMCI) && !defined(LAPI) && !defined(GPSHMEM) && !defined(SHMEM) && !defined(GM) void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset) { --- NetPIPE_3.6.2.orig/src/netpipe.h 2004-06-22 12:38:41.000000000 -0700 +++ NetPIPE_3.6.2/src/netpipe.h 2005-03-14 16:20:30.000000000 -0800 @@ -27,6 +27,10 @@ #include <ib_defs.h> /* ib_mtu_t */ #endif +#ifdef OPENIB +#include <infiniband/verbs.h> /* enum ibv_mtu */ +#endif + #ifdef FINAL #define TRIALS 7 #define RUNTM 0.25 @@ -73,9 +77,14 @@ int commtype; /* Communications type */ int comptype; /* Completion type */ #endif +#if defined(OPENIB) + enum ibv_mtu ib_mtu; /* MTU Size for Infiniband HCA */ + int commtype; /* Communications type */ + int comptype; /* Completion type */ +#endif }; -#if defined(INFINIBAND) +#if defined(INFINIBAND) || defined(OPENIB) enum completion_types { NP_COMP_LOCALPOLL, /* Poll locally on last byte of data */ NP_COMP_VAPIPOLL, /* Poll using vapi function */ _______________________________________________ openib-general mailing list openib-general@openib.org http://openib.org/mailman/listinfo/openib-general To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general