Libor, here's a stub at solving an old problem.
Most of the patch is passing sdp_opt to iocb complete and cancel functions.

I didnt yet test it, and I wont have the time today,
but maybe you could tell me whether I'm going in the right
direction with this.

---

Use "users" in the lock to prevent more iocbs from completing before
a current aio is done.
Any synchronous transfer would then wait till socket is unlocked.
(Once we switch to get_user_pages, we'll be able to do it only for receive
iocbs).

Signed-off-by: Michael S. Tsirkin <[EMAIL PROTECTED]>

Index: sdp_write.c
===================================================================
--- sdp_write.c (revision 2726)
+++ sdp_write.c (working copy)
@@ -109,7 +109,7 @@ int sdp_event_write(struct sdp_opt *conn
                SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
                SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-               sdp_iocb_complete(iocb, 0);
+               sdp_iocb_complete(conn, iocb, 0);
 
                break;
        case SDP_DESC_TYPE_NONE:
Index: sdp_rcvd.c
===================================================================
--- sdp_rcvd.c  (revision 2726)
+++ sdp_rcvd.c  (working copy)
@@ -152,7 +152,7 @@ static int sdp_rcvd_send_sm(struct sdp_o
 
                        conn->src_sent--;
 
-                       sdp_iocb_complete(iocb, 0);
+                       sdp_iocb_complete(conn, iocb, 0);
                }
                /*
                 * Cancel complete, clear the state.
@@ -209,7 +209,7 @@ static int sdp_rcvd_rdma_wr(struct sdp_o
 
        conn->snk_sent--;
 
-       sdp_iocb_complete(iocb, 0);
+       sdp_iocb_complete(conn, iocb, 0);
 
        return 0;
 }
@@ -270,7 +270,7 @@ static int sdp_rcvd_rdma_rd(struct sdp_o
                SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
                SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-               sdp_iocb_complete(iocb, 0);
+               sdp_iocb_complete(conn, iocb, 0);
        }
        /*
         * If Source Cancel was in process, and there are no more outstanding
@@ -562,7 +562,7 @@ static int sdp_rcvd_snk_cancel_ack(struc
        while ((iocb = sdp_iocb_q_get_head(&conn->r_snk))) {
 
                conn->snk_sent--;
-               sdp_iocb_complete(iocb, 0);
+               sdp_iocb_complete(conn, iocb, 0);
        }
        /*
         * cancellation is complete. Cancel flag is cleared in RECV post.
@@ -684,7 +684,7 @@ static int sdp_rcvd_snk_avail(struct sdp
                                sdp_desc_q_put_head(&conn->send_queue,
                                                    (struct sdpc_desc *)iocb);
                        else
-                               sdp_iocb_complete(iocb, 0);
+                               sdp_iocb_complete(conn, iocb, 0);
                }
                /*
                 * If Source Cancel was in process, it should now 
Index: sdp_proto.h
===================================================================
--- sdp_proto.h (revision 2726)
+++ sdp_proto.h (working copy)
@@ -162,7 +162,8 @@ void sdp_iocb_q_put_tail(struct sdpc_ioc
 
 struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key);
 
-void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp);
+void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table,
+                      u32 mask, ssize_t comp);
 
 void sdp_iocb_q_remove(struct sdpc_iocb *iocb);
 
@@ -170,7 +171,8 @@ int sdp_iocb_register(struct sdpc_iocb *
 
 void sdp_iocb_release(struct sdpc_iocb *iocb);
 
-void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status);
+void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb,
+                      ssize_t status);
 
 int sdp_iocb_lock(struct sdpc_iocb *iocb);
 
Index: sdp_read.c
===================================================================
--- sdp_read.c  (revision 2726)
+++ sdp_read.c  (working copy)
@@ -205,7 +205,7 @@ int sdp_event_read(struct sdp_opt *conn,
                SDP_CONN_STAT_READ_INC(conn, iocb->post);
                SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
 
-               sdp_iocb_complete(iocb, 0);
+               sdp_iocb_complete(conn, iocb, 0);
 
                break;
        case SDP_DESC_TYPE_NONE:
@@ -226,7 +226,7 @@ int sdp_event_read(struct sdp_opt *conn,
                SDP_CONN_STAT_READ_INC(conn, iocb->post);
                SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
 
-               sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+               sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
 
                break;
        default:
Index: sdp_send.c
===================================================================
--- sdp_send.c  (revision 2726)
+++ sdp_send.c  (working copy)
@@ -859,7 +859,7 @@ static int sdp_send_data_iocb(struct sdp
                        SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
                        SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
 
-                       sdp_iocb_complete(iocb, 0);
+                       sdp_iocb_complete(conn, iocb, 0);
                }
 
                goto done;
@@ -1730,7 +1730,7 @@ static int sdp_inet_write_cancel(struct 
                         * needs to be compelted.
                         */
                        if (iocb->post > 0) {
-                               sdp_iocb_complete(iocb, 0);
+                               sdp_iocb_complete(conn, iocb, 0);
                                result = -EAGAIN;
                        } else {
                                sdp_iocb_destroy(iocb);
Index: sdp_conn.c
===================================================================
--- sdp_conn.c  (revision 2726)
+++ sdp_conn.c  (working copy)
@@ -697,7 +697,8 @@ static int sdp_desc_q_cancel_lookup_func
        return ((element->type == SDP_DESC_TYPE_IOCB) ? 0 : -ERANGE);
 }
 
-static void sdp_desc_q_cancel_iocb(struct sdpc_desc_q *table, ssize_t error)
+static void sdp_desc_q_cancel_iocb(struct sdp_opt *conn,
+                                  struct sdpc_desc_q *table, ssize_t error)
 {
        struct sdpc_iocb *iocb;
 
@@ -707,24 +708,24 @@ static void sdp_desc_q_cancel_iocb(struc
                         NULL))) {
 
                sdp_iocb_q_remove(iocb);
-               sdp_iocb_complete(iocb, error);
+               sdp_iocb_complete(conn, iocb, error);
        }
 }
 
 void sdp_iocb_q_cancel_all_read(struct sdp_opt *conn, ssize_t error)
 {
-       sdp_iocb_q_cancel(&conn->r_pend, SDP_IOCB_F_ALL, error);
-       sdp_iocb_q_cancel(&conn->r_snk, SDP_IOCB_F_ALL, error);
+       sdp_iocb_q_cancel(conn, &conn->r_pend, SDP_IOCB_F_ALL, error);
+       sdp_iocb_q_cancel(conn, &conn->r_snk, SDP_IOCB_F_ALL, error);
 
-       sdp_desc_q_cancel_iocb(&conn->r_src, error);
+       sdp_desc_q_cancel_iocb(conn, &conn->r_src, error);
 }
 
 void sdp_iocb_q_cancel_all_write(struct sdp_opt *conn, ssize_t error)
 {
-       sdp_iocb_q_cancel(&conn->w_src, SDP_IOCB_F_ALL, error);
+       sdp_iocb_q_cancel(conn, &conn->w_src, SDP_IOCB_F_ALL, error);
 
-       sdp_desc_q_cancel_iocb(&conn->send_queue, error);
-       sdp_desc_q_cancel_iocb(&conn->w_snk, error);
+       sdp_desc_q_cancel_iocb(conn, &conn->send_queue, error);
+       sdp_desc_q_cancel_iocb(conn, &conn->w_snk, error);
 }
 
 void sdp_iocb_q_cancel_all(struct sdp_opt *conn, ssize_t error)
Index: sdp_recv.c
===================================================================
--- sdp_recv.c  (revision 2726)
+++ sdp_recv.c  (working copy)
@@ -663,7 +663,7 @@ static int sdp_recv_buff_iocb_active(str
        /*
         * callback to complete IOCB
         */
-       sdp_iocb_complete(iocb, 0);
+       sdp_iocb_complete(conn, iocb, 0);
 
        return (buff->tail - buff->data);
 }
@@ -716,7 +716,7 @@ static int sdp_recv_buff_iocb_pending(st
                /*
                 * callback to complete IOCB
                 */
-               sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+               sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
        }
 
        return (buff->tail - buff->data);
@@ -875,7 +875,7 @@ static int sdp_inet_read_cancel(struct k
                                /*
                                 * callback to complete IOCB, or drop reference
                                 */
-                               sdp_iocb_complete(iocb, 0);
+                               sdp_iocb_complete(conn, iocb, 0);
                                result = -EAGAIN;
                        }
                        else {
Index: sdp_conn.h
===================================================================
--- sdp_conn.h  (revision 2726)
+++ sdp_conn.h  (working copy)
@@ -471,7 +471,7 @@ static inline void sdp_conn_unlock(struc
                sdp_conn_internal_unlock(conn);
        }
 
-       conn->lock.users = 0;
+       --conn->lock.users;
        wake_up(&conn->lock.waitq);
 
        spin_unlock_irqrestore(&conn->lock.slock, flags);
Index: sdp_iocb.c
===================================================================
--- sdp_iocb.c  (revision 2726)
+++ sdp_iocb.c  (working copy)
@@ -508,6 +508,8 @@ static void do_iocb_complete(void *arg)
         * we ignore the value.
         */
        (void)aio_complete(iocb->req, value, 0);
+
+       sdp_conn_unlock(iocb->conn);
        /*
         * delete IOCB
         */
@@ -517,11 +519,14 @@ static void do_iocb_complete(void *arg)
 /*
  * sdp_iocb_complete - complete an IOCB
  */
-void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status)
+void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb,
+                      ssize_t status)
 {
        iocb->status = status;
        
        if (in_atomic() || irqs_disabled()) {
+               conn->lock.users++;
+               iocb->conn = conn;
                INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb);
                schedule_work(&iocb->completion);
        } else
@@ -750,7 +755,8 @@ void sdp_iocb_q_put_head(struct sdpc_ioc
 /*
  * sdp_iocb_q_cancel - cancel all outstanding AIOs in a queue
  */
-void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp)
+void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table,
+                      u32 mask, ssize_t comp)
 {
        struct sdpc_iocb *iocb;
        struct sdpc_iocb *next;
@@ -772,7 +778,7 @@ void sdp_iocb_q_cancel(struct sdpc_iocb_
                                    iocb->post, iocb->len);
 
                        sdp_iocb_q_remove(iocb);
-                       sdp_iocb_complete(iocb, comp);
+                       sdp_iocb_complete(conn, iocb, comp);
                }
 
                iocb = next;
Index: sdp_iocb.h
===================================================================
--- sdp_iocb.h  (revision 2726)
+++ sdp_iocb.h  (working copy)
@@ -109,7 +109,8 @@ struct sdpc_iocb {
        int           page_count;  /* number of physical pages. */
        int           page_offset; /* offset into first page. */
 
-       struct work_struct completion; /* task for defered completion. */
+       struct work_struct completion; /* task for deferred completion. */
+       struct sdp_opt *conn; /* conn to unlock for deferred completion. */
        /*
         * kernel iocb structure
         */

-- 
MST
_______________________________________________
openib-general mailing list
openib-general@openib.org
http://openib.org/mailman/listinfo/openib-general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to