Libor, here's a stub at solving an old problem. Most of the patch is passing sdp_opt to iocb complete and cancel functions.
I didnt yet test it, and I wont have the time today, but maybe you could tell me whether I'm going in the right direction with this. --- Use "users" in the lock to prevent more iocbs from completing before a current aio is done. Any synchronous transfer would then wait till socket is unlocked. (Once we switch to get_user_pages, we'll be able to do it only for receive iocbs). Signed-off-by: Michael S. Tsirkin <[EMAIL PROTECTED]> Index: sdp_write.c =================================================================== --- sdp_write.c (revision 2726) +++ sdp_write.c (working copy) @@ -109,7 +109,7 @@ int sdp_event_write(struct sdp_opt *conn SDP_CONN_STAT_WRITE_INC(conn, iocb->post); SDP_CONN_STAT_WQ_DEC(conn, iocb->size); - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); break; case SDP_DESC_TYPE_NONE: Index: sdp_rcvd.c =================================================================== --- sdp_rcvd.c (revision 2726) +++ sdp_rcvd.c (working copy) @@ -152,7 +152,7 @@ static int sdp_rcvd_send_sm(struct sdp_o conn->src_sent--; - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); } /* * Cancel complete, clear the state. @@ -209,7 +209,7 @@ static int sdp_rcvd_rdma_wr(struct sdp_o conn->snk_sent--; - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); return 0; } @@ -270,7 +270,7 @@ static int sdp_rcvd_rdma_rd(struct sdp_o SDP_CONN_STAT_WRITE_INC(conn, iocb->post); SDP_CONN_STAT_WQ_DEC(conn, iocb->size); - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); } /* * If Source Cancel was in process, and there are no more outstanding @@ -562,7 +562,7 @@ static int sdp_rcvd_snk_cancel_ack(struc while ((iocb = sdp_iocb_q_get_head(&conn->r_snk))) { conn->snk_sent--; - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); } /* * cancellation is complete. Cancel flag is cleared in RECV post. @@ -684,7 +684,7 @@ static int sdp_rcvd_snk_avail(struct sdp sdp_desc_q_put_head(&conn->send_queue, (struct sdpc_desc *)iocb); else - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); } /* * If Source Cancel was in process, it should now Index: sdp_proto.h =================================================================== --- sdp_proto.h (revision 2726) +++ sdp_proto.h (working copy) @@ -162,7 +162,8 @@ void sdp_iocb_q_put_tail(struct sdpc_ioc struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key); -void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp); +void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table, + u32 mask, ssize_t comp); void sdp_iocb_q_remove(struct sdpc_iocb *iocb); @@ -170,7 +171,8 @@ int sdp_iocb_register(struct sdpc_iocb * void sdp_iocb_release(struct sdpc_iocb *iocb); -void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status); +void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb, + ssize_t status); int sdp_iocb_lock(struct sdpc_iocb *iocb); Index: sdp_read.c =================================================================== --- sdp_read.c (revision 2726) +++ sdp_read.c (working copy) @@ -205,7 +205,7 @@ int sdp_event_read(struct sdp_opt *conn, SDP_CONN_STAT_READ_INC(conn, iocb->post); SDP_CONN_STAT_RQ_DEC(conn, iocb->size); - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); break; case SDP_DESC_TYPE_NONE: @@ -226,7 +226,7 @@ int sdp_event_read(struct sdp_opt *conn, SDP_CONN_STAT_READ_INC(conn, iocb->post); SDP_CONN_STAT_RQ_DEC(conn, iocb->size); - sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0); + sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0); break; default: Index: sdp_send.c =================================================================== --- sdp_send.c (revision 2726) +++ sdp_send.c (working copy) @@ -859,7 +859,7 @@ static int sdp_send_data_iocb(struct sdp SDP_CONN_STAT_WRITE_INC(conn, iocb->post); SDP_CONN_STAT_WQ_DEC(conn, iocb->size); - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); } goto done; @@ -1730,7 +1730,7 @@ static int sdp_inet_write_cancel(struct * needs to be compelted. */ if (iocb->post > 0) { - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); result = -EAGAIN; } else { sdp_iocb_destroy(iocb); Index: sdp_conn.c =================================================================== --- sdp_conn.c (revision 2726) +++ sdp_conn.c (working copy) @@ -697,7 +697,8 @@ static int sdp_desc_q_cancel_lookup_func return ((element->type == SDP_DESC_TYPE_IOCB) ? 0 : -ERANGE); } -static void sdp_desc_q_cancel_iocb(struct sdpc_desc_q *table, ssize_t error) +static void sdp_desc_q_cancel_iocb(struct sdp_opt *conn, + struct sdpc_desc_q *table, ssize_t error) { struct sdpc_iocb *iocb; @@ -707,24 +708,24 @@ static void sdp_desc_q_cancel_iocb(struc NULL))) { sdp_iocb_q_remove(iocb); - sdp_iocb_complete(iocb, error); + sdp_iocb_complete(conn, iocb, error); } } void sdp_iocb_q_cancel_all_read(struct sdp_opt *conn, ssize_t error) { - sdp_iocb_q_cancel(&conn->r_pend, SDP_IOCB_F_ALL, error); - sdp_iocb_q_cancel(&conn->r_snk, SDP_IOCB_F_ALL, error); + sdp_iocb_q_cancel(conn, &conn->r_pend, SDP_IOCB_F_ALL, error); + sdp_iocb_q_cancel(conn, &conn->r_snk, SDP_IOCB_F_ALL, error); - sdp_desc_q_cancel_iocb(&conn->r_src, error); + sdp_desc_q_cancel_iocb(conn, &conn->r_src, error); } void sdp_iocb_q_cancel_all_write(struct sdp_opt *conn, ssize_t error) { - sdp_iocb_q_cancel(&conn->w_src, SDP_IOCB_F_ALL, error); + sdp_iocb_q_cancel(conn, &conn->w_src, SDP_IOCB_F_ALL, error); - sdp_desc_q_cancel_iocb(&conn->send_queue, error); - sdp_desc_q_cancel_iocb(&conn->w_snk, error); + sdp_desc_q_cancel_iocb(conn, &conn->send_queue, error); + sdp_desc_q_cancel_iocb(conn, &conn->w_snk, error); } void sdp_iocb_q_cancel_all(struct sdp_opt *conn, ssize_t error) Index: sdp_recv.c =================================================================== --- sdp_recv.c (revision 2726) +++ sdp_recv.c (working copy) @@ -663,7 +663,7 @@ static int sdp_recv_buff_iocb_active(str /* * callback to complete IOCB */ - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); return (buff->tail - buff->data); } @@ -716,7 +716,7 @@ static int sdp_recv_buff_iocb_pending(st /* * callback to complete IOCB */ - sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0); + sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0); } return (buff->tail - buff->data); @@ -875,7 +875,7 @@ static int sdp_inet_read_cancel(struct k /* * callback to complete IOCB, or drop reference */ - sdp_iocb_complete(iocb, 0); + sdp_iocb_complete(conn, iocb, 0); result = -EAGAIN; } else { Index: sdp_conn.h =================================================================== --- sdp_conn.h (revision 2726) +++ sdp_conn.h (working copy) @@ -471,7 +471,7 @@ static inline void sdp_conn_unlock(struc sdp_conn_internal_unlock(conn); } - conn->lock.users = 0; + --conn->lock.users; wake_up(&conn->lock.waitq); spin_unlock_irqrestore(&conn->lock.slock, flags); Index: sdp_iocb.c =================================================================== --- sdp_iocb.c (revision 2726) +++ sdp_iocb.c (working copy) @@ -508,6 +508,8 @@ static void do_iocb_complete(void *arg) * we ignore the value. */ (void)aio_complete(iocb->req, value, 0); + + sdp_conn_unlock(iocb->conn); /* * delete IOCB */ @@ -517,11 +519,14 @@ static void do_iocb_complete(void *arg) /* * sdp_iocb_complete - complete an IOCB */ -void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status) +void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb, + ssize_t status) { iocb->status = status; if (in_atomic() || irqs_disabled()) { + conn->lock.users++; + iocb->conn = conn; INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb); schedule_work(&iocb->completion); } else @@ -750,7 +755,8 @@ void sdp_iocb_q_put_head(struct sdpc_ioc /* * sdp_iocb_q_cancel - cancel all outstanding AIOs in a queue */ -void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp) +void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table, + u32 mask, ssize_t comp) { struct sdpc_iocb *iocb; struct sdpc_iocb *next; @@ -772,7 +778,7 @@ void sdp_iocb_q_cancel(struct sdpc_iocb_ iocb->post, iocb->len); sdp_iocb_q_remove(iocb); - sdp_iocb_complete(iocb, comp); + sdp_iocb_complete(conn, iocb, comp); } iocb = next; Index: sdp_iocb.h =================================================================== --- sdp_iocb.h (revision 2726) +++ sdp_iocb.h (working copy) @@ -109,7 +109,8 @@ struct sdpc_iocb { int page_count; /* number of physical pages. */ int page_offset; /* offset into first page. */ - struct work_struct completion; /* task for defered completion. */ + struct work_struct completion; /* task for deferred completion. */ + struct sdp_opt *conn; /* conn to unlock for deferred completion. */ /* * kernel iocb structure */ -- MST _______________________________________________ openib-general mailing list openib-general@openib.org http://openib.org/mailman/listinfo/openib-general To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general