Hello all,

I have weird behavior of libibverbs + libmthca, which makes me suspicious
about either libmthca or the HCA firmware.

The source of the test code is attached.

I have a pair of processes talking to each other. They are very similar to
ibv_rc_pingpong. The difference is that my processes issue
ibv_post_send(IBV_WR_RDMA_WRITE_WITH_IMM), and they try to keep several
(namely 4) outstanding Receive and Send Work Requests.

All Send Work Requests are sequentially numbered. The number is placed into
the wr_id and imm_data fields. When the process receives a Send Work
Completion, wr_id is checked for consistency with the sent numbers, and the
next Send Work Request is posted (RDMA Write with IMM, 32 bytes of
out-of-line data). So far so good.

All Receive Work Requests are sequentially numbered as well. The number is
placed into the wr_id field. When the process gets a Receive Work
Completion, it checks both the wr_id and imm_data for consistency with the
expected numbers, and posts the next Receive Work Request. The consistency
test eventually fails (after a few hundred thousand iterations - a few
seconds). The Completion status is "success", wr_id is out of order,
imm_data is in order. Despite inconsistency, the process still tries to post
the next Receive Work Request, which fails as if the Receive Queue were full
(I modified libmthca's mthca_tavor_post_recv() to return distinct error
codes). All subsequent Receive Work Completions fail the consistency test
and ibv_post_recv() fail in the same manner. Then everything stops waiting
for Work Completions inside ibv_get_cq_event().

I believe that, in this test, wr_id from Receive Work Completions must
arrive in order, but they do not.

I am sure that "queue overflow" failures of ibv_post_recv() are illegal
because I keep the queue no more than half-full.

The test fails with libibverbs-1.0.5 (and older), libmthca-1.0.4 (and older).

~>uname -a
Linux node02 2.6.23.9 #1 SMP PREEMPT Fri Nov 30 21:23:11 CST 2007 x86_64
x86_64 x86_64 GNU/Linux

~>grep 'model name' /proc/cpuinfo
model name      : Dual Core AMD Opteron(tm) Processor 285
model name      : Dual Core AMD Opteron(tm) Processor 285

~>ibv_devinfo
hca_id: mthca0
         fw_ver:                         4.8.200
         node_guid:                      0002:c902:0024:42f4
         sys_image_guid:                 0002:c902:0024:42f7
         vendor_id:                      0x02c9
         vendor_part_id:                 25208
         hw_ver:                         0xA0
         board_id:                       MT_0330140002
         phys_port_cnt:                  2

The attached source code evolved from a huge application in the attempt to
reduce the code to a reasonable size, so it looks weird.  Run "gcc -O2
flaw.c -o flaw -lpthread -libverbs" to compile it. On one end run "sudo
flaw", on the other end start "sudo flaw <hostname>", where hostname is the
name of the first end. It fails much sooner if you start another pair of
processes.

This is a typical output of the program:

~>./flaw
device mthca0
completion thread runs
QP connected
55871
147789
241006
330033
421304
509184
595437
682410
779035
872444
964561
1051060
1138062
1224279
1311327
wr_id=00152d78, recv_wr_id_rsp=00152d76, imm.seqn=00002d76
wr_id=00152d79, recv_wr_id_rsp=00152d77, imm.seqn=00002d77
ibv_post_recv() failed, code=-2
wr_id=00152d79, recv_wr_id_rsp=00152d78, imm.seqn=00002d78
ibv_post_recv() failed, code=-2
wr_id=00152d79, recv_wr_id_rsp=00152d79, imm.seqn=00002d79
ibv_post_recv() failed, code=-2
wr_id=00152d7a, recv_wr_id_rsp=00152d7a, imm.seqn=00002d7a
ibv_post_recv() failed, code=-2

The several rows of numbers are the iteration counter printed once per
second. In this case it made at least 1311327 successful iterations.

The iteration #1387894 (0x152d76) failed. In a Receive Work Completion,
wr_id was 00152d78, while 00152d76 was expected. The imm_data received was
good (it must be the 16 LSBs of recv_wr_id_rsp). The Completion caused the
next Receive Work Request to be successfully posted (since no error is printed).

The next iteration #1387895 (0x152d77) failed in a similar fashion. The
Completion, in an attempt to post the next Receive Work Request, got "work
queue overflow" error, which is impossible because the queue size is 8.

All subsequent iterations failed similarly.

The peer process displays no errors.

I am new to libibverbs and it possible that I am misusing it.

Thank you,

Roman Kononov

#include <time.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <infiniband/verbs.h>

typedef enum { false, true } bool;
//======================================================================
union imm_t {
	struct {
		uint16_t seqn;
		uint8_t credit;
		uint8_t has_payload;
	};
	uint32_t all;
};

struct payload_t {
	char dirt[4096];
};

#define ctx_ring_cap	4
#define ctx_ibv_send_queue_cap (ctx_ring_cap*2)
#define ctx_ibv_recv_queue_cap (ctx_ring_cap*2)
#define ctx_ibv_cmpl_queue_cap (ctx_ibv_send_queue_cap+ctx_ibv_recv_queue_cap)
#define ctx_ibv_portN 1

struct ctx_t {
	void(*rx_start)(void*);
	void* rx_start_arg;

	struct payload_t payload;

	struct ibv_context* ibv_context;
	enum ibv_mtu  port_max_mtu;
	uint16_t port_lid;
	struct ibv_pd* ibv_protection_domain;
	struct ibv_comp_channel* ibv_completion_channel;
	struct ibv_cq* ibv_completion_queue;
	struct ibv_qp* ibv_queue_pair;
	struct ibv_mr* ibv_memory_region;
	uint32_t ibv_remote_rkey;
	uint64_t ibv_remote_vptr;

	bool bad;
	bool connected;
	uint send_credit;
	uint32_t recv_credit_delta;
	uint32_t recv_wr_id_req;
	uint32_t recv_wr_id_rsp;
	uint32_t send_wr_id_req;
	uint32_t send_wr_id_rsp;
	uint tx_req_count;

	pthread_mutex_t pthread_mutex;
	pthread_t pthread_id;
};

void ctx_completion_thread(struct ctx_t*);
void ctx_handle_completion(struct ctx_t*,struct ibv_wc const*,struct ibv_wc const*);
void ctx_try_send(struct ctx_t*);

struct ctx_t* ctx_create(void(*rx_start)(void*),void* rx_start_arg);
void ctx_get_local_address(struct ctx_t*,char* buffer,size_t buffer_size);
void ctx_connect(struct ctx_t*,char const* buffer);
void ctx_tx_start(struct ctx_t*);
//============================
void ctx_tx_start(struct ctx_t* ctx) {
	int code=pthread_mutex_lock(&ctx->pthread_mutex);
	assert(code==0);
	++ctx->tx_req_count;
	bool connected=ctx->connected;
	code=pthread_mutex_unlock(&ctx->pthread_mutex);
	assert(code==0);
	if (connected) ctx_try_send(ctx);
}
//============================
void ctx_get_local_address(struct ctx_t* ctx,char* buffer,size_t buffer_size) {

	assert(ctx->ibv_queue_pair==0);

    //initialize the IB objects ...

	int device_list_size;
	struct ibv_device** device_list=ibv_get_device_list(&device_list_size);
	assert(device_list!=0);
	int i;
	for (i=0; i!=device_list_size; ++i) {
		struct ibv_device* device=device_list[i];
		printf("device %s\n",ibv_get_device_name(device));
	}
	assert(device_list_size>0);

	ctx->ibv_context=ibv_open_device(device_list[0]);
	assert(ctx->ibv_context!=0);

	struct ibv_device_attr device_attr;
	int code=ibv_query_device(ctx->ibv_context,&device_attr);
	assert(code==0);
	assert(ctx_ibv_portN<=device_attr.phys_port_cnt);

	struct ibv_port_attr port_attr;
	code=ibv_query_port(ctx->ibv_context,ctx_ibv_portN,&port_attr);
	assert(code==0);
	ctx->port_max_mtu=port_attr.max_mtu;
	ctx->port_lid=port_attr.lid;

	ctx->ibv_protection_domain=ibv_alloc_pd(ctx->ibv_context);
	assert(ctx->ibv_protection_domain!=0);

	ctx->ibv_completion_channel=ibv_create_comp_channel(ctx->ibv_context);
	assert(ctx->ibv_completion_channel!=0);

	ctx->ibv_completion_queue=ibv_create_cq(ctx->ibv_context,ctx_ibv_cmpl_queue_cap,ctx,ctx->ibv_completion_channel,0);
	assert(ctx->ibv_completion_queue!=0);

	int access_flags=IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ;
	ctx->ibv_memory_region=ibv_reg_mr(ctx->ibv_protection_domain,&ctx->payload,sizeof(ctx->payload),access_flags);
	assert(ctx->ibv_memory_region!=0);

	struct ibv_qp_init_attr attr1;
	memset(&attr1,0,sizeof(attr1));
	attr1.send_cq=ctx->ibv_completion_queue;
	attr1.recv_cq=ctx->ibv_completion_queue;
	attr1.cap.max_send_wr=ctx_ibv_send_queue_cap;
	attr1.cap.max_recv_wr=ctx_ibv_recv_queue_cap;
	attr1.cap.max_send_sge=1;
	attr1.cap.max_recv_sge=1;
	//attr1.cap.max_inline_data=512;
	attr1.qp_type=IBV_QPT_RC;
	//attr1.sq_sig_all=false;
	ctx->ibv_queue_pair=ibv_create_qp(ctx->ibv_protection_domain,&attr1);
	assert(ctx->ibv_queue_pair!=0);

	struct ibv_qp_attr attr2;
	memset(&attr2,0,sizeof(attr2));
	attr2.qp_state=IBV_QPS_INIT;
	attr2.pkey_index=0;
	attr2.port_num=ctx_ibv_portN;
	attr2.qp_access_flags=access_flags;
	code=ibv_modify_qp(ctx->ibv_queue_pair,&attr2,IBV_QP_STATE|IBV_QP_PKEY_INDEX|IBV_QP_PORT|IBV_QP_ACCESS_FLAGS);
	assert(code==0);

	code=snprintf(buffer,buffer_size,"%x:%llx:%x:%x:%x",
					ctx->ibv_memory_region->rkey,
					(long long)&ctx->payload,
					ctx->port_lid,
					ctx->port_max_mtu,
					ctx->ibv_queue_pair->qp_num);
	assert(code>0 && (size_t)code<buffer_size);

	void*(*func)(void*)=(void*(*)(void*))&ctx_completion_thread;
	code=pthread_create(&ctx->pthread_id,0,func,ctx);
	assert(code==0);
}
//============================
void ctx_connect(struct ctx_t* ctx,char const* buffer) {

    while (ctx->recv_wr_id_req!=ctx_ring_cap) {
		struct ibv_recv_wr wr;
		memset(&wr,0,sizeof(wr));
		wr.wr_id=ctx->recv_wr_id_req++;
		struct ibv_recv_wr* bad_wr;
		int code=ibv_post_recv(ctx->ibv_queue_pair,&wr,&bad_wr);
		if (code!=0) printf("ibv_post_recv() failed\n");
    }

	int remote_rkey;
	long long remote_vptr;
	int remote_port_lid;
	int remote_port_max_mtu;
	int remote_qp_num;
	int code=sscanf(buffer,"%x:%llx:%x:%x:%x",
						&remote_rkey,
						&remote_vptr,
						&remote_port_lid,
						&remote_port_max_mtu,
						&remote_qp_num);
	assert(code==5);
	ctx->ibv_remote_rkey=remote_rkey;
	ctx->ibv_remote_vptr=remote_vptr;

	//connect the queue pairs

	struct ibv_qp_attr attr;

	//Receive Queue
	memset(&attr,0,sizeof(attr));
	attr.qp_state=IBV_QPS_RTR;
	attr.path_mtu=ctx->port_max_mtu<remote_port_max_mtu?ctx->port_max_mtu:remote_port_max_mtu;
	attr.dest_qp_num=remote_qp_num;
	attr.rq_psn=1;
	attr.max_dest_rd_atomic=1;
	attr.min_rnr_timer=12;
	attr.ah_attr.is_global=false;
	attr.ah_attr.dlid=remote_port_lid;
	attr.ah_attr.sl=0;
	attr.ah_attr.src_path_bits=0;
	attr.ah_attr.port_num=ctx_ibv_portN;
	code=ibv_modify_qp(	ctx->ibv_queue_pair,
						&attr,
						IBV_QP_STATE | IBV_QP_AV		|
							IBV_QP_PATH_MTU				|
							IBV_QP_DEST_QPN				|
							IBV_QP_RQ_PSN				|
							IBV_QP_MAX_DEST_RD_ATOMIC	|
							IBV_QP_MIN_RNR_TIMER);
	assert(code==0);

	//Send Queue
	memset(&attr,0,sizeof(attr));
	attr.qp_state=IBV_QPS_RTS;
	attr.timeout=14;
	attr.retry_cnt=7;
	attr.rnr_retry=7;
	attr.sq_psn=1;
	attr.max_rd_atomic=1;
	code=ibv_modify_qp(	ctx->ibv_queue_pair,
						&attr,
						IBV_QP_STATE | IBV_QP_TIMEOUT |
							IBV_QP_RETRY_CNT	|
							IBV_QP_RNR_RETRY	|
							IBV_QP_SQ_PSN		|
							IBV_QP_MAX_QP_RD_ATOMIC);
	assert(code==0);
	printf("QP connected\n");

	ctx->connected=true;
	ctx_try_send(ctx);
}
//============================
void ctx_try_send(struct ctx_t* ctx) {

	while (1) {
		int code=pthread_mutex_lock(&ctx->pthread_mutex);
		assert(code==0);
		if (ctx->send_wr_id_req-ctx->send_wr_id_rsp==ctx_ring_cap) break;

		if (ctx->tx_req_count==0) {
			if (ctx->recv_credit_delta==0 || ctx->send_credit==0) break;
			union imm_t imm;
			imm.seqn=ctx->send_wr_id_req;
			imm.credit=ctx->recv_credit_delta;
			imm.has_payload=false;
			ctx->recv_credit_delta=0;
			--ctx->send_credit;
			struct ibv_send_wr wr;
			memset(&wr,0,sizeof(wr));
			wr.wr_id=ctx->send_wr_id_req;
			wr.imm_data=imm.all;
			wr.opcode=IBV_WR_RDMA_WRITE_WITH_IMM;
			wr.send_flags=IBV_SEND_SIGNALED;
			struct ibv_send_wr* bad_wr;
			code=ibv_post_send(ctx->ibv_queue_pair,&wr,&bad_wr);
			assert(code==0);
			++ctx->send_wr_id_req;
			break;
		}

		if (ctx->send_credit<=(ctx->recv_credit_delta==0)) break;

		union imm_t imm;
		imm.seqn=ctx->send_wr_id_req;
		imm.credit=ctx->recv_credit_delta;
		imm.has_payload=true;
		ctx->recv_credit_delta=0;
		--ctx->send_credit;
		struct ibv_sge list;
		struct ibv_send_wr wr;
		memset(&wr,0,sizeof(wr));
		list.addr=(size_t)&ctx->payload;
		list.length=32;
		list.lkey=ctx->ibv_memory_region->lkey;
		wr.sg_list=&list;
		wr.num_sge=1;
		wr.wr.rdma.remote_addr=ctx->ibv_remote_vptr;
		wr.wr.rdma.rkey=ctx->ibv_remote_rkey;
		wr.wr_id=ctx->send_wr_id_req;
		wr.imm_data=imm.all;
		wr.opcode=IBV_WR_RDMA_WRITE_WITH_IMM;
		wr.send_flags=IBV_SEND_SIGNALED;
		struct ibv_send_wr* bad_wr;
		code=ibv_post_send(ctx->ibv_queue_pair,&wr,&bad_wr);
		assert(code==0);
		--ctx->tx_req_count;
		++ctx->send_wr_id_req;

		code=pthread_mutex_unlock(&ctx->pthread_mutex);
		assert(code==0);
	}
	int code=pthread_mutex_unlock(&ctx->pthread_mutex);
	assert(code==0);
}
//============================
void ctx_handle_completion(struct ctx_t* ctx,struct ibv_wc const* wc_beg,struct ibv_wc const* wc_end) {

	size_t recv_count=0;
	size_t send_credit_loc=0;
	struct ibv_wc const* wc;

	uint wr_id=ctx->recv_wr_id_rsp;
	for (wc=wc_beg; !ctx->bad && wc!=wc_end; ++wc) {
		if (!(wc->opcode&IBV_WC_RECV)) continue;
		ctx->bad|=wc->wr_id!=wr_id++;
	}

	for (wc=wc_beg; wc!=wc_end; ++wc) {
		assert(wc->status==IBV_WC_SUCCESS);
		if (!(wc->opcode&IBV_WC_RECV)) { //send completion
			assert(wc->wr_id==ctx->send_wr_id_rsp);
			++ctx->send_wr_id_rsp;
		} else { //recv completion
			++recv_count;
			union imm_t imm;
			imm.all=wc->imm_data;
			if (ctx->bad) printf("wr_id=%08x, recv_wr_id_rsp=%08x, imm.seqn=%08x\n",(uint32_t)wc->wr_id,ctx->recv_wr_id_rsp,imm.seqn);
			assert(imm.seqn==(uint16_t)ctx->recv_wr_id_rsp);
			++ctx->recv_wr_id_rsp;
			send_credit_loc+=imm.credit;
			if (imm.has_payload) (*ctx->rx_start)(ctx->rx_start_arg);
			struct ibv_recv_wr wr;
			memset(&wr,0,sizeof(wr));
			wr.wr_id=ctx->recv_wr_id_req++;
			struct ibv_recv_wr* bad_wr;
			int code=ibv_post_recv(ctx->ibv_queue_pair,&wr,&bad_wr);
			if (code!=0) printf("ibv_post_recv() failed, code=%i\n",code);
		}
	}

	int code=pthread_mutex_lock(&ctx->pthread_mutex);
	assert(code==0);
	ctx->send_credit+=send_credit_loc;
	assert(ctx->send_credit<=ctx_ring_cap);
	ctx->recv_credit_delta+=recv_count;
	assert(ctx->recv_credit_delta<=ctx_ring_cap);
	code=pthread_mutex_unlock(&ctx->pthread_mutex);
	assert(code==0);

	ctx_try_send(ctx);
}
//============================
void ctx_completion_thread(struct ctx_t* ctx) {
	printf("completion thread runs\n");
	int code=ibv_req_notify_cq(ctx->ibv_completion_queue,0);
	assert(code==0);
	struct ibv_wc completions[64];
	size_t const max_count=sizeof(completions)/sizeof(completions[0]);
	while (1) {
		struct ibv_cq* cq;
		void* uctx;
		code=ibv_get_cq_event(ctx->ibv_completion_channel,&cq,&uctx);
		assert(code==0);
		assert(cq==ctx->ibv_completion_queue);
		code=ibv_req_notify_cq(ctx->ibv_completion_queue,0);
		assert(code==0);
		while (1) {
			code=ibv_poll_cq(cq,max_count,completions);
			assert(code>=0);
			size_t count=code;
			if (count==0) break;
			ctx_handle_completion(ctx,completions,completions+count);
			if (count<max_count) break;
		}
	}
}
//============================
struct ctx_t* ctx_create(void(*rx_start)(void*),void* rx_start_arg) {

	struct ctx_t* ctx=malloc(sizeof(struct ctx_t));
	assert(ctx!=0);

	ctx->bad=false;
	ctx->connected=false;
	ctx->send_credit=ctx_ring_cap;
	ctx->recv_credit_delta=0;
	ctx->recv_wr_id_req=0;
	ctx->recv_wr_id_rsp=0;
	ctx->send_wr_id_req=1<<31;
	ctx->send_wr_id_rsp=1<<31;
	ctx->tx_req_count=0;

	ctx->rx_start=rx_start;
	ctx->rx_start_arg=rx_start_arg;

	ctx->ibv_context=0;
	ctx->port_max_mtu=0;
	ctx->port_lid=0;
	ctx->ibv_protection_domain=0;
	ctx->ibv_completion_channel=0;

	ctx->ibv_completion_queue=0;
	ctx->ibv_queue_pair=0;
	ctx->ibv_memory_region=0;
	ctx->ibv_remote_rkey=0;
	ctx->ibv_remote_vptr=0;

	int code=pthread_mutex_init(&ctx->pthread_mutex,0);
	assert(code==0);

	return ctx;
}
//======================================================================
#define test_tcp_port 8888

struct test_t {
	pthread_mutex_t pthread_mutex;
	pthread_cond_t  pthread_cond;
	uint rx_count;
	struct ctx_t* ctx;
};

void test_rx_start(struct test_t*);

struct test_t* test_create();
void test_connect_client(struct test_t*,char const* server_name);
void test_connect_server(struct test_t*);
void test_thread(struct test_t*,bool initiator);
//============================
struct test_t* test_create() {
	struct test_t* test=malloc(sizeof(struct test_t));
	assert(test!=0);
	pthread_mutex_init(&test->pthread_mutex,0);
	pthread_cond_init(&test->pthread_cond,0);
	test->rx_count=0;
	test->ctx=ctx_create((void(*)(void*))&test_rx_start,test);
	return test;
}
//============================
void test_connect_server(struct test_t* test) {
	char service[20];
	sprintf(service,"%d",test_tcp_port);
	struct addrinfo hints;
	memset(&hints,0,sizeof(hints));
	hints.ai_flags=AI_PASSIVE;
	hints.ai_family=AF_UNSPEC;
	hints.ai_socktype=SOCK_STREAM;
	struct addrinfo *res,*t;
	int code=getaddrinfo(NULL,service,&hints,&res);
	assert(code==0);
	int ssock=-1;
	for (t=res; t!=0; t=t->ai_next) {
		ssock=socket(t->ai_family,t->ai_socktype,t->ai_protocol);
		if (ssock<0) continue;
		int optval=1;
		code=setsockopt(ssock,SOL_SOCKET,SO_REUSEADDR,&optval,sizeof(optval));
		assert(code==0);
		code=bind(ssock,t->ai_addr,t->ai_addrlen);
		if (code==0) break;
		close(ssock);
		ssock=-1;
	}
	freeaddrinfo(res);
	if (ssock<0) {
		printf("cannot bind port %d\n",test_tcp_port);
		exit(1);
	}
	code=listen(ssock,1);
	assert(code==0);
	struct sockaddr remote_saddr;
	socklen_t remote_saddrlen=sizeof(remote_saddr);
	int hsock=accept(ssock,&remote_saddr,&remote_saddrlen);
	close(ssock);
	assert(hsock>=0);
	assert(remote_saddrlen==sizeof(remote_saddr));
	printf("accepted connection from %i.%i.%i.%i\n",remote_saddr.sa_data[2],remote_saddr.sa_data[3],remote_saddr.sa_data[4],remote_saddr.sa_data[5]);

	char remote_addr[1024];
	code=recv(hsock,remote_addr,sizeof(remote_addr),MSG_WAITALL);
	assert((size_t)code==sizeof(remote_addr));

	char local_addr[1024];
	ctx_get_local_address(test->ctx,local_addr,sizeof(local_addr));
	code=send(hsock,local_addr,sizeof(local_addr),0);
	assert((size_t)code==sizeof(local_addr));

	code=shutdown(hsock,SHUT_RDWR);
	assert(code==0);
	code=close(hsock);
	assert(code==0);

	ctx_connect(test->ctx,remote_addr);
}
//============================
void test_connect_client(struct test_t* test,char const* server_name) {
	char service[20];
	sprintf(service,"%d",test_tcp_port);
	struct addrinfo hints;
	memset(&hints,0,sizeof(hints));
	hints.ai_family=AF_UNSPEC;
	hints.ai_socktype=SOCK_STREAM;
	struct addrinfo* res;
	int code=getaddrinfo(server_name,service,&hints,&res);
	assert(code==0);
	int hsock=-1;
	while (1) {
		printf("connecting to %s\n",server_name);
		struct addrinfo* t;
		for (t=res; t!=0; t=t->ai_next) {
			hsock=socket(t->ai_family,t->ai_socktype,t->ai_protocol);
			if (hsock<0) continue;
			code=connect(hsock,t->ai_addr,t->ai_addrlen);
			if (code==0) break;
			close(hsock);
			hsock=-1;
		}
		if (hsock>=0) break;
		usleep(300000);
	}
	freeaddrinfo(res);
	printf("connected to %s\n",server_name);

	char addr[1024];
	ctx_get_local_address(test->ctx,addr,sizeof(addr));
	code=send(hsock,addr,sizeof(addr),0);
	assert((size_t)code==sizeof(addr));
	code=recv(hsock,addr,sizeof(addr),MSG_WAITALL);
	assert((size_t)code==sizeof(addr));

	code=shutdown(hsock,SHUT_RDWR);
	assert(code==0);
	code=close(hsock);
	assert(code==0);

	ctx_connect(test->ctx,addr);
}
//============================
void test_rx_start(struct test_t* test) {
	//This is called from the "completion" thread
	//Atomically increment test->rx_count and signal the "test" thread
	int code=pthread_mutex_lock(&test->pthread_mutex);
	assert(code==0);
	++test->rx_count;
	code=pthread_cond_signal(&test->pthread_cond);
	assert(code==0);
	code=pthread_mutex_unlock(&test->pthread_mutex);
	assert(code==0);
}
//============================
void test_thread(struct test_t* test,bool initiator) {
	int prn_count=0;
	time_t prn_time=time(0);
	uint count=initiator?128:0; //inject initial supply of messages into the loop
	while (1) {
		if (time(0)!=prn_time) {
			prn_time=time(0);
			printf("%u\n",prn_count);
		}
		for (; count!=0; --count) {
			ctx_tx_start(test->ctx);
			++prn_count;
		}
		//do atomically { count=test->rx_count; test->rx_count=0; }
		int code=pthread_mutex_lock(&test->pthread_mutex);
		assert(code==0);
		while (1) {
			count=test->rx_count;
			test->rx_count=0;
			if (count!=0) break;
			code=pthread_cond_wait(&test->pthread_cond,&test->pthread_mutex);
			assert(code==0);
		}
		code=pthread_mutex_unlock(&test->pthread_mutex);
	}
}
//======================================================================
int main(int an,char const* const ar[]) {
	struct test_t* test=test_create();
	bool const initiator=an>=2;
	if (initiator) test_connect_client(test,ar[1]);
	else test_connect_server(test);
	test_thread(test,initiator);
    return 0;
}
//======================================================================

_______________________________________________
general mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to