Re: [PATCH for-next v3 3/7] RDMA/rxe: Cleanup code for responder Atomic operations

2023-01-16 Thread Jason Gunthorpe
On Fri, Dec 23, 2022 at 03:51:54PM +0900, Daisuke Matsuda wrote:
> @@ -733,60 +734,83 @@ static enum resp_states process_flush(struct rxe_qp *qp,
>  /* Guarantee atomicity of atomic operations at the machine level. */
>  static DEFINE_SPINLOCK(atomic_ops_lock);
>  
> -static enum resp_states atomic_reply(struct rxe_qp *qp,
> -  struct rxe_pkt_info *pkt)
> +enum resp_states rxe_process_atomic(struct rxe_qp *qp,
> + struct rxe_pkt_info *pkt, u64 *vaddr)
>  {
> - u64 *vaddr;
>   enum resp_states ret;
> - struct rxe_mr *mr = qp->resp.mr;
>   struct resp_res *res = qp->resp.res;
>   u64 value;
>  
> - if (!res) {
> - res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
> - qp->resp.res = res;
> + /* check vaddr is 8 bytes aligned. */
> + if (!vaddr || (uintptr_t)vaddr & 7) {
> + ret = RESPST_ERR_MISALIGNED_ATOMIC;
> + goto out;
>   }
>  
> - if (!res->replay) {
> - if (mr->state != RXE_MR_STATE_VALID) {
> - ret = RESPST_ERR_RKEY_VIOLATION;
> - goto out;
> - }
> + spin_lock(&atomic_ops_lock);
> + res->atomic.orig_val = value = *vaddr;
>  
> - vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
> - sizeof(u64));

I think you need to properly fix the lifetime problem with iova_to_vaddr
function, not hack around it like this.

iova_to_vaddr should be able to return an IOVA for ODP just fine - the
reason it can't is the same bug it has with normal MRs, the mapping
can just change under the feet and there is no protective locking.

If you are going to follow the same ODP design as mlx5 then
fundamentally all ODP does to the MR is add a not-present bit and
allow the MR pages to churn rapidly.

Make the MR safe to changes in the page references against races and
ODP will work just fine.

This will be easier on top of Bob's xarray patch, please check what he
has there and test it.

Thanks,
Jason



[PATCH for-next v3 3/7] RDMA/rxe: Cleanup code for responder Atomic operations

2022-12-22 Thread Daisuke Matsuda
Currently, rxe_responder() directly calls the function to execute Atomic
operations. This need to be modified to insert some conditional branches
for the ODP feature. Additionally, rxe_resp.h is newly added to be used by
rxe_odp.c in near future.

Signed-off-by: Daisuke Matsuda 
---
 drivers/infiniband/sw/rxe/rxe_resp.c | 100 +--
 drivers/infiniband/sw/rxe/rxe_resp.h |   9 +++
 2 files changed, 71 insertions(+), 38 deletions(-)
 create mode 100644 drivers/infiniband/sw/rxe/rxe_resp.h

diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c 
b/drivers/infiniband/sw/rxe/rxe_resp.c
index 991550baef8c..e18bca076337 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -9,6 +9,7 @@
 #include "rxe.h"
 #include "rxe_loc.h"
 #include "rxe_queue.h"
+#include "rxe_resp.h"
 
 enum resp_states {
RESPST_NONE,
@@ -733,60 +734,83 @@ static enum resp_states process_flush(struct rxe_qp *qp,
 /* Guarantee atomicity of atomic operations at the machine level. */
 static DEFINE_SPINLOCK(atomic_ops_lock);
 
-static enum resp_states atomic_reply(struct rxe_qp *qp,
-struct rxe_pkt_info *pkt)
+enum resp_states rxe_process_atomic(struct rxe_qp *qp,
+   struct rxe_pkt_info *pkt, u64 *vaddr)
 {
-   u64 *vaddr;
enum resp_states ret;
-   struct rxe_mr *mr = qp->resp.mr;
struct resp_res *res = qp->resp.res;
u64 value;
 
-   if (!res) {
-   res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
-   qp->resp.res = res;
+   /* check vaddr is 8 bytes aligned. */
+   if (!vaddr || (uintptr_t)vaddr & 7) {
+   ret = RESPST_ERR_MISALIGNED_ATOMIC;
+   goto out;
}
 
-   if (!res->replay) {
-   if (mr->state != RXE_MR_STATE_VALID) {
-   ret = RESPST_ERR_RKEY_VIOLATION;
-   goto out;
-   }
+   spin_lock(&atomic_ops_lock);
+   res->atomic.orig_val = value = *vaddr;
 
-   vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
-   sizeof(u64));
+   if (pkt->opcode == IB_OPCODE_RC_COMPARE_SWAP) {
+   if (value == atmeth_comp(pkt))
+   value = atmeth_swap_add(pkt);
+   } else {
+   value += atmeth_swap_add(pkt);
+   }
 
-   /* check vaddr is 8 bytes aligned. */
-   if (!vaddr || (uintptr_t)vaddr & 7) {
-   ret = RESPST_ERR_MISALIGNED_ATOMIC;
-   goto out;
-   }
+   *vaddr = value;
+   spin_unlock(&atomic_ops_lock);
 
-   spin_lock_bh(&atomic_ops_lock);
-   res->atomic.orig_val = value = *vaddr;
+   qp->resp.msn++;
 
-   if (pkt->opcode == IB_OPCODE_RC_COMPARE_SWAP) {
-   if (value == atmeth_comp(pkt))
-   value = atmeth_swap_add(pkt);
-   } else {
-   value += atmeth_swap_add(pkt);
-   }
+   /* next expected psn, read handles this separately */
+   qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
+   qp->resp.ack_psn = qp->resp.psn;
 
-   *vaddr = value;
-   spin_unlock_bh(&atomic_ops_lock);
+   qp->resp.opcode = pkt->opcode;
+   qp->resp.status = IB_WC_SUCCESS;
 
-   qp->resp.msn++;
+   ret = RESPST_ACKNOWLEDGE;
+out:
+   return ret;
+}
+
+static enum resp_states rxe_atomic_ops(struct rxe_qp *qp,
+   struct rxe_pkt_info *pkt,
+   struct rxe_mr *mr)
+{
+   u64 *vaddr;
+   int ret;
+
+   vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
+ sizeof(u64));
+
+   if (pkt->mask & RXE_ATOMIC_MASK)
+   ret = rxe_process_atomic(qp, pkt, vaddr);
+   else
+   ret = RESPST_ERR_UNSUPPORTED_OPCODE;
+
+   return ret;
+}
 
-   /* next expected psn, read handles this separately */
-   qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
-   qp->resp.ack_psn = qp->resp.psn;
+static enum resp_states rxe_atomic_reply(struct rxe_qp *qp,
+struct rxe_pkt_info *pkt)
+{
+   struct rxe_mr *mr = qp->resp.mr;
+   struct resp_res *res = qp->resp.res;
+   int ret;
 
-   qp->resp.opcode = pkt->opcode;
-   qp->resp.status = IB_WC_SUCCESS;
+   if (!res) {
+   res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
+   qp->resp.res = res;
}
 
-   ret = RESPST_ACKNOWLEDGE;
-out:
+   if (!res->replay) {
+   if (mr->state != RXE_MR_STATE_VALID)
+   return RESPST_ERR_RKEY_VIOLATION;
+   ret = rxe_atomic_ops(qp, pkt, mr);
+   } else
+   ret = RESPST_ACKNOWLEDGE;