On 2008-01-23 18:31, Roman Kononov wrote:
On 2008-01-23 17:32, Roland Dreier wrote:
I'd be curious to run it.  It can't hurt to have the test...

This is similar to my previous program. The difference is that this one makes many (up to 10, in test_create()) sets of SQ+RQ+CQ in struct conn_t, which share a single Completion Channel in struct ctx_t. Every conn_t has a ring of receive buffers, a ring of send buffers, send sequence number, receive sequence number. Every time a buffer is sent, just before ibv_post_send() call, the send sequence number is placed into the buffer, imm_data and wr_id. Upon Send Completion, wr_id and the sent buffer must contain the expected send sequence number. Every time a buffer is received, just before ibv_post_recv() call, the receive sequence number is placed into wr_id. Upon Receive Completion, wr_id, the received buffer and imm_data must contain the expected receive sequence number. These 2 "musts" are sometimes violated. In my setup assertion fails in lines 303 (receiver) and 287 (sender).

The program has 2 threads. The first one reads the completion channel, validates the Send and Receive Completions, issues ibv_post_recv() and ibv_post_send(). The second one can only issue ibv_post_send().

The program makes 2 QP. Increasing the number of QP seemly does not increase the probability of failure.

The program prints <time: iteration_#_for_QP_#0 iteration_#_for_QP_#1>.

In my setup, it seems that if I run two pairs of the program, the failure occurs sooner.

Roman


Sorry, I forgot the program...
#include <time.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <infiniband/verbs.h>
#include <string.h>

typedef enum { false, true } bool;
//======================================================================
union imm_t {
	struct {
		uint16_t seqn;
		uint8_t credit;
		uint8_t has_payload;
	};
	uint32_t all;
};

#define ctx_ibv_portN 1

struct payload_t {
	uint16_t seqn;
	char data[];
};

struct conn_t {
	size_t payload_size;
	size_t ring_cap;

	void* rx_ring;
	void* tx_ring;

	struct ibv_cq* ibv_completion_queue;
	struct ibv_qp* ibv_queue_pair;
	struct ibv_mr* ibv_memory_region;
	uint32_t ibv_remote_rkey;
	uint64_t ibv_remote_vptr;

	bool connected;
	bool bad;
	uint send_credit;
	uint32_t recv_credit_delta;
	uint32_t recv_wr_id_req;
	uint32_t recv_wr_id_rsp;
	uint32_t send_wr_id_req;
	uint32_t send_wr_id_rsp;
	uint tx_req_count;

	struct ctx_t* ctx;
	pthread_mutex_t pthread_mutex;

	void(*rx_start)(void*);
	void* rx_start_arg;
};

struct ctx_t {
	struct ibv_context* ibv_context;
	enum ibv_mtu  port_max_mtu;
	uint16_t port_lid;
	struct ibv_pd* ibv_protection_domain;
	struct ibv_comp_channel* ibv_completion_channel;
	struct conn_t* conns[10];
	int conn_count;
};

struct ctx_t* ctx_create();
void ctx_connection_create(struct ctx_t* ctx,size_t payload_size,size_t ring_cap,void(*rx_start)(void*),void* rx_start_arg);
void ctx_completion_thread(struct ctx_t*);

void conn_handle_completion(struct conn_t*,struct ibv_wc const*,struct ibv_wc const*);
void conn_get_local_address(struct conn_t*,char* buffer,size_t buffer_size);
void conn_connect(struct conn_t*,char const* buffer);
void conn_try_send(struct conn_t*);
void conn_tx_start(struct conn_t*);
//============================
void conn_tx_start(struct conn_t* conn) {
	int code=pthread_mutex_lock(&conn->pthread_mutex);
	assert(code==0);
	++conn->tx_req_count;
	bool connected=conn->connected;
	code=pthread_mutex_unlock(&conn->pthread_mutex);
	assert(code==0);
	if (connected) conn_try_send(conn);
}
//============================
void conn_get_local_address(struct conn_t* conn,char* buffer,size_t buffer_size) {

	assert(conn->ibv_queue_pair==0);

	uint const send_queue_cap=conn->ring_cap;
	uint const recv_queue_cap=conn->ring_cap;
	uint const cmpl_queue_cap=send_queue_cap+recv_queue_cap;

	assert(conn->rx_ring==0 && conn->tx_ring==0);
	size_t memsize=conn->payload_size*conn->ring_cap*2;
	void* rings;
	int code=posix_memalign(&rings,4096,memsize);
	assert(code==0);
	conn->rx_ring=rings;
	conn->tx_ring=rings+memsize/2;

	int access_flags=IBV_ACCESS_LOCAL_WRITE|IBV_ACCESS_REMOTE_WRITE|IBV_ACCESS_REMOTE_READ;
	conn->ibv_memory_region=ibv_reg_mr(conn->ctx->ibv_protection_domain,rings,memsize,access_flags);
	assert(conn->ibv_memory_region!=0);

	conn->ibv_completion_queue=ibv_create_cq(conn->ctx->ibv_context,cmpl_queue_cap,conn,conn->ctx->ibv_completion_channel,0);
	assert(conn->ibv_completion_queue!=0);
	code=ibv_req_notify_cq(conn->ibv_completion_queue,0);
	assert(code==0);

	struct ibv_qp_init_attr attr1;
	memset(&attr1,0,sizeof(attr1));
	attr1.send_cq=conn->ibv_completion_queue;
	attr1.recv_cq=conn->ibv_completion_queue;
	attr1.cap.max_send_wr=send_queue_cap;
	attr1.cap.max_recv_wr=recv_queue_cap;
	attr1.cap.max_send_sge=1;
	attr1.cap.max_recv_sge=1;
	//attr1.cap.max_inline_data=512;
	attr1.qp_type=IBV_QPT_RC;
	//attr1.sq_sig_all=false;
	conn->ibv_queue_pair=ibv_create_qp(conn->ctx->ibv_protection_domain,&attr1);
	assert(conn->ibv_queue_pair!=0);

	struct ibv_qp_attr attr2;
	memset(&attr2,0,sizeof(attr2));
	attr2.qp_state=IBV_QPS_INIT;
	attr2.pkey_index=0;
	attr2.port_num=ctx_ibv_portN;
	attr2.qp_access_flags=access_flags;
	code=ibv_modify_qp(conn->ibv_queue_pair,&attr2,IBV_QP_STATE|IBV_QP_PKEY_INDEX|IBV_QP_PORT|IBV_QP_ACCESS_FLAGS);
	assert(code==0);

	code=snprintf(	buffer,buffer_size,"%x:%x" ":%x:%llx:%x",
					conn->ctx->port_lid,
					conn->ctx->port_max_mtu,
					conn->ibv_memory_region->rkey,
					(long long)conn->rx_ring,
					conn->ibv_queue_pair->qp_num);
	assert(code>0 && (size_t)code<buffer_size);
}
//============================
void conn_connect(struct conn_t* conn,char const* buffer) {

    while (conn->recv_wr_id_req!=conn->ring_cap) {
		struct ibv_recv_wr wr;
		memset(&wr,0,sizeof(wr));
		wr.wr_id=conn->recv_wr_id_req++;
		struct ibv_recv_wr* bad_wr;
		int code=ibv_post_recv(conn->ibv_queue_pair,&wr,&bad_wr);
		if (code!=0) printf("ibv_post_recv() failed\n");
    }

	int remote_port_lid;
	int remote_port_max_mtu;
	int remote_rkey;
	long long remote_vptr;
	int remote_qp_num;
	int code=sscanf(buffer,"%x:%x" ":%x:%llx:%x",
					&remote_port_lid,
					&remote_port_max_mtu,
					&remote_rkey,
					&remote_vptr,
					&remote_qp_num);
	assert(code==5);
	conn->ibv_remote_rkey=remote_rkey;
	conn->ibv_remote_vptr=remote_vptr;

	//connect the queue pairs

	struct ibv_qp_attr attr;

	//Receive Queue
	memset(&attr,0,sizeof(attr));
	attr.qp_state=IBV_QPS_RTR;
	attr.path_mtu=conn->ctx->port_max_mtu<remote_port_max_mtu?conn->ctx->port_max_mtu:remote_port_max_mtu;
	attr.dest_qp_num=remote_qp_num;
	attr.rq_psn=1;
	attr.max_dest_rd_atomic=1;
	attr.min_rnr_timer=12;
	attr.ah_attr.is_global=false;
	attr.ah_attr.dlid=remote_port_lid;
	attr.ah_attr.sl=0;
	attr.ah_attr.src_path_bits=0;
	attr.ah_attr.port_num=ctx_ibv_portN;
	code=ibv_modify_qp(	conn->ibv_queue_pair,
						&attr,
						IBV_QP_STATE | IBV_QP_AV		|
							IBV_QP_PATH_MTU				|
							IBV_QP_DEST_QPN				|
							IBV_QP_RQ_PSN				|
							IBV_QP_MAX_DEST_RD_ATOMIC	|
							IBV_QP_MIN_RNR_TIMER);
	assert(code==0);

	//Send Queue
	memset(&attr,0,sizeof(attr));
	attr.qp_state=IBV_QPS_RTS;
	attr.timeout=14;
	attr.retry_cnt=7;
	attr.rnr_retry=7;
	attr.sq_psn=1;
	attr.max_rd_atomic=1;
	code=ibv_modify_qp(	conn->ibv_queue_pair,
						&attr,
						IBV_QP_STATE | IBV_QP_TIMEOUT |
							IBV_QP_RETRY_CNT	|
							IBV_QP_RNR_RETRY	|
							IBV_QP_SQ_PSN		|
							IBV_QP_MAX_QP_RD_ATOMIC);
	assert(code==0);
	printf("QP connected\n");

	conn->connected=true;
	conn_try_send(conn);
}
//============================
void conn_try_send(struct conn_t* conn) {
	while (1) {
		int code=pthread_mutex_lock(&conn->pthread_mutex);
		assert(code==0);
		if (conn->send_wr_id_req-conn->send_wr_id_rsp==conn->ring_cap) break;

		bool has_payload=true;
		if (conn->tx_req_count==0) {
			if (conn->recv_credit_delta<conn->ring_cap/2 || conn->send_credit==0) break;
			has_payload=false;
		} else if (conn->send_credit<=(conn->recv_credit_delta==0)) break;

		size_t const payload_idx=conn->send_wr_id_req%conn->ring_cap;
		struct payload_t* payload=conn->tx_ring + payload_idx*conn->payload_size;
		payload->seqn=conn->send_wr_id_req;

		union imm_t imm;
		imm.seqn=conn->send_wr_id_req;
		imm.credit=conn->recv_credit_delta;
		imm.has_payload=has_payload;
		conn->recv_credit_delta=0;
		--conn->send_credit;
		struct ibv_sge list;
		struct ibv_send_wr wr;
		memset(&wr,0,sizeof(wr));
		list.addr=(size_t)payload;
		list.length=has_payload?conn->payload_size:32;
		list.lkey=conn->ibv_memory_region->lkey;
		wr.sg_list=&list;
		wr.num_sge=1;
		wr.wr.rdma.remote_addr=conn->ibv_remote_vptr+payload_idx*conn->payload_size;
		wr.wr.rdma.rkey=conn->ibv_remote_rkey;
		wr.wr_id=conn->send_wr_id_req;
		wr.imm_data=imm.all;
		wr.opcode=IBV_WR_RDMA_WRITE_WITH_IMM;
		wr.send_flags=IBV_SEND_SIGNALED;
		struct ibv_send_wr* bad_wr;
		code=ibv_post_send(conn->ibv_queue_pair,&wr,&bad_wr);
		assert(code==0);
		if (has_payload) --conn->tx_req_count;
		++conn->send_wr_id_req;

		code=pthread_mutex_unlock(&conn->pthread_mutex);
		assert(code==0);
	}
	int code=pthread_mutex_unlock(&conn->pthread_mutex);
	assert(code==0);
}
//============================
void conn_handle_completion(struct conn_t* conn,struct ibv_wc const* wc_beg,struct ibv_wc const* wc_end) {

	size_t recv_count=0;
	size_t send_credit_loc=0;
	struct ibv_wc const* wc;

	for (wc=wc_beg; wc!=wc_end; ++wc) {
		assert(wc->status==IBV_WC_SUCCESS);
		if (!(wc->opcode&IBV_WC_RECV)) { //send completion
			size_t const payload_idx=conn->send_wr_id_rsp%conn->ring_cap;
			struct payload_t* payload=conn->tx_ring + payload_idx*conn->payload_size;
//			conn->bad|=wc->wr_id!=conn->send_wr_id_rsp;
//			conn->bad|=payload->seqn!=conn->send_wr_id_rsp;
//			if (conn->bad) printf("send completion, conn->send_wr_id_rsp=%08x, wc->wr_id=%08x, payload->seqn=%08x\n",conn->send_wr_id_rsp,(uint32_t)wc->wr_id,payload->seqn);
			assert(wc->wr_id==conn->send_wr_id_rsp);
			assert((uint16_t)payload->seqn==(uint16_t)conn->send_wr_id_rsp);
//			payload->seqn=conn->send_wr_id_rsp<<16 | 0xdead;
			payload->seqn=0xdead;
			++conn->send_wr_id_rsp;
		} else { //recv completion
			++recv_count;
			union imm_t imm;
			imm.all=wc->imm_data;
			size_t const payload_idx=conn->recv_wr_id_rsp%conn->ring_cap;
			struct payload_t* payload=conn->rx_ring + payload_idx*conn->payload_size;
//			conn->bad|=wc->wr_id!=conn->recv_wr_id_rsp;
//			conn->bad|=imm.seqn!=(uint16_t)conn->recv_wr_id_rsp;
//			conn->bad|=(uint16_t)(payload->seqn^conn->recv_wr_id_rsp)!=0;
//			if (conn->bad) printf("recv completion, conn->recv_wr_id_rsp=%08x, wc->wr_id=%08x, payload->seqn=%08x, imm.seqn=%04x\n",conn->recv_wr_id_rsp,(uint32_t)wc->wr_id,payload->seqn,imm.seqn);
			assert(wc->wr_id==conn->recv_wr_id_rsp);
			assert(imm.seqn==(uint16_t)conn->recv_wr_id_rsp);
			assert((uint16_t)payload->seqn==(uint16_t)conn->recv_wr_id_rsp);
//			payload->seqn=conn->recv_wr_id_rsp<<16 | 0xbeef;
			payload->seqn=0xbeef;
			++conn->recv_wr_id_rsp;
			send_credit_loc+=imm.credit;
			if (imm.has_payload) (*conn->rx_start)(conn->rx_start_arg);
			struct ibv_recv_wr wr;
			memset(&wr,0,sizeof(wr));
			wr.wr_id=conn->recv_wr_id_req++;
			struct ibv_recv_wr* bad_wr;
			int code=ibv_post_recv(conn->ibv_queue_pair,&wr,&bad_wr);
			if (code!=0) printf("ibv_post_recv() failed, code=%i\n",code);
		}
	}

	int code=pthread_mutex_lock(&conn->pthread_mutex);
	assert(code==0);
	conn->send_credit+=send_credit_loc;
	assert(conn->send_credit<=conn->ring_cap);
	conn->recv_credit_delta+=recv_count;
	assert(conn->recv_credit_delta<=conn->ring_cap);
	code=pthread_mutex_unlock(&conn->pthread_mutex);
	assert(code==0);

	conn_try_send(conn);
}
//============================
void ctx_completion_thread(struct ctx_t* ctx) {
	printf("completion thread runs\n");
	struct ibv_wc completions[64];
	size_t const max_count=sizeof(completions)/sizeof(completions[0]);
	while (1) {
		struct ibv_cq* cq;
		void* uctx;
		int code=ibv_get_cq_event(ctx->ibv_completion_channel,&cq,&uctx);
		assert(code==0);
		struct conn_t* conn=uctx;
		assert(conn->ctx==ctx);
		assert(cq==conn->ibv_completion_queue);
		code=ibv_req_notify_cq(cq,0);
		assert(code==0);
		while (1) {
			code=ibv_poll_cq(cq,max_count,completions);
			assert(code>=0);
			size_t count=code;
			if (count==0) break;
			conn_handle_completion(conn,completions,completions+count);
			if (count<max_count) break;
		}
	}
}
//============================
struct ctx_t* ctx_create() {

	struct ctx_t* ctx=malloc(sizeof(struct ctx_t));
	assert(ctx!=0);
	memset(ctx,0,sizeof(*ctx));

	int device_list_size;
	struct ibv_device** device_list=ibv_get_device_list(&device_list_size);
	assert(device_list!=0);
	int i;
	for (i=0; i!=device_list_size; ++i) {
		struct ibv_device* device=device_list[i];
		printf("device %s\n",ibv_get_device_name(device));
	}
	assert(device_list_size>0);

	ctx->ibv_context=ibv_open_device(device_list[0]);
	assert(ctx->ibv_context!=0);

	struct ibv_device_attr device_attr;
	int code=ibv_query_device(ctx->ibv_context,&device_attr);
	assert(code==0);
	assert(ctx_ibv_portN<=device_attr.phys_port_cnt);

	struct ibv_port_attr port_attr;
	code=ibv_query_port(ctx->ibv_context,ctx_ibv_portN,&port_attr);
	assert(code==0);
	ctx->port_max_mtu=port_attr.max_mtu;
	ctx->port_lid=port_attr.lid;

	ctx->ibv_protection_domain=ibv_alloc_pd(ctx->ibv_context);
	assert(ctx->ibv_protection_domain!=0);

	ctx->ibv_completion_channel=ibv_create_comp_channel(ctx->ibv_context);
	assert(ctx->ibv_completion_channel!=0);

	void*(*func)(void*)=(void*(*)(void*))&ctx_completion_thread;
	pthread_t tid;
	code=pthread_create(&tid,0,func,ctx);
	assert(code==0);

	return ctx;
}
//============================
void ctx_connection_create(struct ctx_t* ctx,size_t payload_size,size_t ring_cap,void(*rx_start)(void*),void* rx_start_arg) {
	assert(ctx->conn_count!=sizeof(ctx->conns)/sizeof(ctx->conns[0]));

	struct conn_t* conn=malloc(sizeof(struct conn_t));
	assert(conn!=0);
	memset(conn,0,sizeof(*conn));
	ctx->conns[ctx->conn_count]=conn;

	conn->payload_size=payload_size;
	conn->ring_cap=ring_cap;
	conn->rx_start=rx_start;
	conn->rx_start_arg=rx_start_arg;

	conn->connected=false;
	conn->send_credit=ring_cap;
	conn->recv_credit_delta=0;
	conn->recv_wr_id_req=0;
	conn->recv_wr_id_rsp=0;
	conn->send_wr_id_req=1<<31;
	conn->send_wr_id_rsp=1<<31;
	conn->tx_req_count=0;
	conn->ctx=ctx;

	int code=pthread_mutex_init(&conn->pthread_mutex,0);
	assert(code==0);

 	++ctx->conn_count;
}
//======================================================================
#define test_tcp_port 8888
#define max_conn_count 10

struct test_t {
	pthread_mutex_t pthread_mutex;
	pthread_cond_t  pthread_cond;
	uint rx_counts[max_conn_count];
	uint tx_counts[max_conn_count];
	uint prn_counts[max_conn_count];
	struct ctx_t* ctx;
};

void test_rx_start(struct test_t*,int which_connection);
void test_rx_start0(struct test_t* t) { test_rx_start(t,0); }
void test_rx_start1(struct test_t* t) { test_rx_start(t,1); }
void test_rx_start2(struct test_t* t) { test_rx_start(t,2); }
void test_rx_start3(struct test_t* t) { test_rx_start(t,3); }
void test_rx_start4(struct test_t* t) { test_rx_start(t,4); }
void test_rx_start5(struct test_t* t) { test_rx_start(t,5); }
void test_rx_start6(struct test_t* t) { test_rx_start(t,6); }
void test_rx_start7(struct test_t* t) { test_rx_start(t,7); }
void test_rx_start8(struct test_t* t) { test_rx_start(t,8); }
void test_rx_start9(struct test_t* t) { test_rx_start(t,9); }

struct test_t* test_create();
void test_connect_client(struct test_t*,char const* server_name);
void test_connect_server(struct test_t*);
void test_thread(struct test_t*,bool initiator);
//============================
struct test_t* test_create() {
	struct test_t* test=malloc(sizeof(struct test_t));
	assert(test!=0);
	memset(test,0,sizeof(*test));
	pthread_mutex_init(&test->pthread_mutex,0);
	pthread_cond_init(&test->pthread_cond,0);
	test->ctx=ctx_create();

/*	ctx_connection_create(test->ctx,1024,	64,(void(*)(void*))&test_rx_start0,test);
	ctx_connection_create(test->ctx,2*1024,	16,(void(*)(void*))&test_rx_start1,test);*/

 	ctx_connection_create(test->ctx,1024,		64,(void(*)(void*))&test_rx_start0,test);
 	ctx_connection_create(test->ctx,2*1024*1024, 4,(void(*)(void*))&test_rx_start1,test);

/*	ctx_connection_create(test->ctx,1<<10,16,(void(*)(void*))&test_rx_start0,test);
	ctx_connection_create(test->ctx,1<<11,16,(void(*)(void*))&test_rx_start1,test);
	ctx_connection_create(test->ctx,1<<12,16,(void(*)(void*))&test_rx_start2,test);
	ctx_connection_create(test->ctx,1<<13,16,(void(*)(void*))&test_rx_start3,test);
	ctx_connection_create(test->ctx,1<<14,16,(void(*)(void*))&test_rx_start4,test);
	ctx_connection_create(test->ctx,1<<15,16,(void(*)(void*))&test_rx_start5,test);
	ctx_connection_create(test->ctx,1<<16,16,(void(*)(void*))&test_rx_start6,test);
	ctx_connection_create(test->ctx,1<<17,16,(void(*)(void*))&test_rx_start7,test);
	ctx_connection_create(test->ctx,1<<18,16,(void(*)(void*))&test_rx_start8,test);
	ctx_connection_create(test->ctx,1<<19,16,(void(*)(void*))&test_rx_start9,test);*/
	assert(test->ctx->conn_count<=max_conn_count);
	return test;
}
//============================
void test_connect_server(struct test_t* test) {
	char service[20];
	sprintf(service,"%d",test_tcp_port);
	struct addrinfo hints;
	memset(&hints,0,sizeof(hints));
	hints.ai_flags=AI_PASSIVE;
	hints.ai_family=AF_INET;
	hints.ai_socktype=SOCK_STREAM;
	struct addrinfo *res,*t;
	int code=getaddrinfo(NULL,service,&hints,&res);
	assert(code==0);
	int ssock=-1;
	for (t=res; t!=0; t=t->ai_next) {
		ssock=socket(t->ai_family,t->ai_socktype,t->ai_protocol);
		if (ssock<0) continue;
		int optval=1;
		code=setsockopt(ssock,SOL_SOCKET,SO_REUSEADDR,&optval,sizeof(optval));
		assert(code==0);
		code=bind(ssock,t->ai_addr,t->ai_addrlen);
		if (code==0) break;
		close(ssock);
		ssock=-1;
	}
	freeaddrinfo(res);
	if (ssock<0) {
		printf("cannot bind port %d\n",test_tcp_port);
		exit(1);
	}
	code=listen(ssock,1);
	assert(code==0);
	struct sockaddr remote_saddr;
	socklen_t remote_saddrlen=sizeof(remote_saddr);
	int hsock=accept(ssock,&remote_saddr,&remote_saddrlen);
	close(ssock);
	assert(hsock>=0);
	assert(remote_saddrlen==sizeof(remote_saddr));
	printf("accepted connection from %u.%u.%u.%u\n",(unsigned)remote_saddr.sa_data[2],(unsigned)remote_saddr.sa_data[3],(unsigned)remote_saddr.sa_data[4],(unsigned)remote_saddr.sa_data[5]);

	int i;
	for (i=0; i!=test->ctx->conn_count; ++i) {
		char remote_addr[1024];
		code=recv(hsock,remote_addr,sizeof(remote_addr),MSG_WAITALL);
		assert((size_t)code==sizeof(remote_addr));

		char local_addr[1024];
		conn_get_local_address(test->ctx->conns[i],local_addr,sizeof(local_addr));
		code=send(hsock,local_addr,sizeof(local_addr),0);
		assert((size_t)code==sizeof(local_addr));

		conn_connect(test->ctx->conns[i],remote_addr);
	}

	code=shutdown(hsock,SHUT_RDWR);
	assert(code==0);
	code=close(hsock);
	assert(code==0);
}
//============================
void test_connect_client(struct test_t* test,char const* server_name) {
	char service[20];
	sprintf(service,"%d",test_tcp_port);
	struct addrinfo hints;
	memset(&hints,0,sizeof(hints));
	hints.ai_family=AF_UNSPEC;
	hints.ai_socktype=SOCK_STREAM;
	struct addrinfo* res;
	int code=getaddrinfo(server_name,service,&hints,&res);
	assert(code==0);
	int hsock=-1;
	while (1) {
		printf("connecting to %s\n",server_name);
		struct addrinfo* t;
		for (t=res; t!=0; t=t->ai_next) {
			hsock=socket(t->ai_family,t->ai_socktype,t->ai_protocol);
			if (hsock<0) continue;
			code=connect(hsock,t->ai_addr,t->ai_addrlen);
			if (code==0) break;
			close(hsock);
			hsock=-1;
		}
		if (hsock>=0) break;
		usleep(300000);
	}
	freeaddrinfo(res);
	printf("connected to %s\n",server_name);

	int i;
	for (i=0; i!=test->ctx->conn_count; ++i) {
		char addr[1024];
		conn_get_local_address(test->ctx->conns[i],addr,sizeof(addr));
		code=send(hsock,addr,sizeof(addr),0);
		assert((size_t)code==sizeof(addr));
		code=recv(hsock,addr,sizeof(addr),MSG_WAITALL);
		assert((size_t)code==sizeof(addr));
		conn_connect(test->ctx->conns[i],addr);
	}

	code=shutdown(hsock,SHUT_RDWR);
	assert(code==0);
	code=close(hsock);
	assert(code==0);
}
//============================
void test_rx_start(struct test_t* test,int conn_idx) {
	//This is called from the "completion" thread
	//Atomically increment test->rx_count and signal the "test" thread
	int code=pthread_mutex_lock(&test->pthread_mutex);
	assert(code==0);
	++test->rx_counts[conn_idx];
	code=pthread_cond_signal(&test->pthread_cond);
	assert(code==0);
	code=pthread_mutex_unlock(&test->pthread_mutex);
	assert(code==0);
}
//============================
void test_thread(struct test_t* test,bool initiator) {
	int const conn_count=test->ctx->conn_count;
	int i;
	for (i=0; initiator && i!=conn_count; ++i) test->tx_counts[i]=128; //inject initial supply of messages into the loop
	time_t const time0=time(0);
	time_t prn_time=time0;
	while (1) {
		if (time(0)!=prn_time) {
			prn_time=time(0);
			printf("%li:",prn_time-time0);
			for (i=0; i!=conn_count; ++i) printf(" %u",test->prn_counts[i]);
			printf("\n");
		}
		for (i=0; i!=conn_count; ++i) {
			test->prn_counts[i]+=test->tx_counts[i];
			for (; test->tx_counts[i]!=0; --test->tx_counts[i]) conn_tx_start(test->ctx->conns[i]);
		}
		//do atomically { counts=test->rx_counts; test->rx_counts=0; }
		int code=pthread_mutex_lock(&test->pthread_mutex);
		assert(code==0);
		while (1) {
			uint total_count=0;
			for (i=0; i!=conn_count; ++i) {
				total_count+=test->rx_counts[i];
				test->tx_counts[i]=test->rx_counts[i];
				test->rx_counts[i]=0;
			}
			if (total_count!=0) break;
			code=pthread_cond_wait(&test->pthread_cond,&test->pthread_mutex);
			assert(code==0);
		}
		code=pthread_mutex_unlock(&test->pthread_mutex);
	}
}
//======================================================================
int main(int an,char const* const ar[]) {
	struct test_t* test=test_create();
	bool const initiator=an>=2;
	if (initiator) test_connect_client(test,ar[1]);
	else test_connect_server(test);
	test_thread(test,initiator);
    return 0;
}
//======================================================================
_______________________________________________
general mailing list
[email protected]
http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to