From: Jack Wang <jinpu.w...@profitbricks.com>

Service accept connection requests from clients and reserve memory
for them.

It excutes rdma transfers, hands over received data to ibnbd_server.

Signed-off-by: Jack Wang <jinpu.w...@profitbricks.com>
Signed-off-by: Kleber Souza <kleber.so...@profitbricks.com>
Signed-off-by: Danil Kipnis <danil.kip...@profitbricks.com>
Signed-off-by: Roman Pen <roman.peny...@profitbricks.com>
---
 drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c | 3744 +++++++++++++++++++++++
 1 file changed, 3744 insertions(+)
 create mode 100644 drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c

diff --git a/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c 
b/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c
new file mode 100644
index 0000000..513e90a
--- /dev/null
+++ b/drivers/infiniband/ulp/ibtrs_server/ibtrs_srv.c
@@ -0,0 +1,3744 @@
+/*
+ * InfiniBand Transport Layer
+ *
+ * Copyright (c) 2014 - 2017 ProfitBricks GmbH. All rights reserved.
+ * Authors: Fabian Holler < m...@fholler.de>
+ *          Jack Wang <jinpu.w...@profitbricks.com>
+ *         Kleber Souza <kleber.so...@profitbricks.com>
+ *         Danil Kipnis <danil.kip...@profitbricks.com>
+ *         Roman Pen <roman.peny...@profitbricks.com>
+ *          Milind Dumbare <milind.dumb...@gmail.com>
+ *
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions, and the following disclaimer,
+ *    without modification.
+ * 2. Redistributions in binary form must reproduce at minimum a disclaimer
+ *    substantially similar to the "NO WARRANTY" disclaimer below
+ *    ("Disclaimer") and any redistribution must be conditioned upon
+ *    including a substantially similar Disclaimer requirement for further
+ *    binary redistribution.
+ * 3. Neither the names of the above-listed copyright holders nor the names
+ *    of any contributors may be used to endorse or promote products derived
+ *    from this software without specific prior written permission.
+ *
+ * Alternatively, this software may be distributed under the terms of the
+ * GNU General Public License ("GPL") version 2 as published by the Free
+ * Software Foundation.
+ *
+ * NO WARRANTY
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDERS OR CONTRIBUTORS BE LIABLE FOR SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+ * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+ * IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGES.
+ *
+ */
+
+#include <linux/module.h>
+#include <linux/sizes.h>
+#include <linux/utsname.h>
+#include <linux/cpumask.h>
+#include <linux/debugfs.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <rdma/ib.h>
+
+#include <rdma/ibtrs_srv.h>
+#include "ibtrs_srv_sysfs.h"
+#include "ibtrs_srv_internal.h"
+#include <rdma/ibtrs.h>
+#include <rdma/ibtrs_log.h>
+
+MODULE_AUTHOR("ib...@profitbricks.com");
+MODULE_DESCRIPTION("InfiniBand Transport Server");
+MODULE_VERSION(__stringify(IBTRS_VER));
+MODULE_LICENSE("GPL");
+
+#define DEFAULT_MAX_IO_SIZE_KB 128
+#define DEFAULT_MAX_IO_SIZE (DEFAULT_MAX_IO_SIZE_KB * 1024)
+static int max_io_size = DEFAULT_MAX_IO_SIZE;
+#define MAX_REQ_SIZE PAGE_SIZE
+static int rcv_buf_size = DEFAULT_MAX_IO_SIZE + MAX_REQ_SIZE;
+
+static int max_io_size_set(const char *val, const struct kernel_param *kp)
+{
+       int err, ival;
+
+       err = kstrtoint(val, 0, &ival);
+       if (err)
+               return err;
+
+       if (ival < 4096 || ival + MAX_REQ_SIZE > (4096 * 1024) ||
+           (ival + MAX_REQ_SIZE) % 512 != 0) {
+               ERR_NP("Invalid max io size value %d, has to be"
+                      " > %d, < %d\n", ival, 4096, 4194304);
+               return -EINVAL;
+       }
+
+       max_io_size = ival;
+       rcv_buf_size = max_io_size + MAX_REQ_SIZE;
+       INFO_NP("max io size changed to %d\n", ival);
+
+       return 0;
+}
+
+static const struct kernel_param_ops max_io_size_ops = {
+       .set            = max_io_size_set,
+       .get            = param_get_int,
+};
+module_param_cb(max_io_size, &max_io_size_ops, &max_io_size, 0444);
+MODULE_PARM_DESC(max_io_size,
+                "Max size for each IO request, when change the unit is in byte"
+                " (default: " __stringify(DEFAULT_MAX_IO_SIZE_KB) "KB)");
+
+#define DEFAULT_SESS_QUEUE_DEPTH 512
+static int sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
+module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
+MODULE_PARM_DESC(sess_queue_depth,
+                "Number of buffers for pending I/O requests to allocate"
+                " per session. Maximum: " __stringify(MAX_SESS_QUEUE_DEPTH)
+                " (default: " __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
+
+#define DEFAULT_INIT_POOL_SIZE 10
+static int init_pool_size = DEFAULT_INIT_POOL_SIZE;
+module_param_named(init_pool_size, init_pool_size, int, 0444);
+MODULE_PARM_DESC(init_pool_size,
+                "Maximum size of the RDMA buffers pool to pre-allocate on"
+                " module load, in number of sessions. (default: "
+                __stringify(DEFAULT_INIT_POOL_SIZE) ")");
+
+#define DEFAULT_POOL_SIZE_HI_WM 100
+static int pool_size_hi_wm = DEFAULT_POOL_SIZE_HI_WM;
+module_param_named(pool_size_hi_wm, pool_size_hi_wm, int, 0444);
+MODULE_PARM_DESC(pool_size_hi_wm,
+                "High watermark value for the size of RDMA buffers pool"
+                " (in number of sessions). Newly allocated buffers will be"
+                " added to the pool until pool_size_hi_wm is reached."
+                " (default: " __stringify(DEFAULT_POOL_SIZE_HI_WM) ")");
+
+static int retry_count = 7;
+
+static int retry_count_set(const char *val, const struct kernel_param *kp)
+{
+       int err, ival;
+
+       err = kstrtoint(val, 0, &ival);
+       if (err)
+               return err;
+
+       if (ival < MIN_RTR_CNT || ival > MAX_RTR_CNT) {
+               ERR_NP("Invalid retry count value %d, has to be"
+                      " > %d, < %d\n", ival, MIN_RTR_CNT, MAX_RTR_CNT);
+               return -EINVAL;
+       }
+
+       retry_count = ival;
+       INFO_NP("QP retry count changed to %d\n", ival);
+
+       return 0;
+}
+
+static const struct kernel_param_ops retry_count_ops = {
+       .set            = retry_count_set,
+       .get            = param_get_int,
+};
+module_param_cb(retry_count, &retry_count_ops, &retry_count, 0644);
+
+MODULE_PARM_DESC(retry_count, "Number of times to send the message if the"
+                " remote side didn't respond with Ack or Nack (default: 3,"
+                " min: " __stringify(MIN_RTR_CNT) ", max: "
+                __stringify(MAX_RTR_CNT) ")");
+
+static int default_heartbeat_timeout_ms = DEFAULT_HEARTBEAT_TIMEOUT_MS;
+
+static int default_heartbeat_timeout_set(const char *val,
+                                        const struct kernel_param *kp)
+{
+       int ret, ival;
+
+       ret = kstrtouint(val, 0, &ival);
+       if (ret)
+               return ret;
+
+       ret = ibtrs_heartbeat_timeout_validate(ival);
+       if (ret)
+               return ret;
+
+       default_heartbeat_timeout_ms = ival;
+       INFO_NP("Default heartbeat timeout changed to %d\n", ival);
+
+       return 0;
+}
+
+static const struct kernel_param_ops heartbeat_timeout_ops = {
+       .set            = default_heartbeat_timeout_set,
+       .get            = param_get_int,
+};
+
+module_param_cb(default_heartbeat_timeout_ms, &heartbeat_timeout_ops,
+               &default_heartbeat_timeout_ms, 0644);
+MODULE_PARM_DESC(default_heartbeat_timeout_ms, "default heartbeat timeout,"
+                " min. " __stringify(MIN_HEARTBEAT_TIMEOUT_MS)
+                " (default:" __stringify(DEFAULT_HEARTBEAT_TIMEOUT_MS) ")");
+
+static char cq_affinity_list[256] = "";
+static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
+
+static void init_cq_affinity(void)
+{
+       sprintf(cq_affinity_list, "0-%d", nr_cpu_ids - 1);
+}
+
+static int cq_affinity_list_set(const char *val, const struct kernel_param *kp)
+{
+       int ret = 0, len = strlen(val);
+       cpumask_var_t new_value;
+
+       if (!strlen(cq_affinity_list))
+               init_cq_affinity();
+
+       if (len >= sizeof(cq_affinity_list))
+               return -EINVAL;
+       if (!alloc_cpumask_var(&new_value, GFP_KERNEL))
+               return -ENOMEM;
+
+       ret = cpulist_parse(val, new_value);
+       if (ret) {
+               ERR_NP("Can't set cq_affinity_list \"%s\": %d\n", val, ret);
+               goto free_cpumask;
+       }
+
+       strlcpy(cq_affinity_list, val, sizeof(cq_affinity_list));
+       *strchrnul(cq_affinity_list, '\n') = '\0';
+       cpumask_copy(&cq_affinity_mask, new_value);
+
+       INFO_NP("cq_affinity_list changed to %*pbl\n",
+               cpumask_pr_args(&cq_affinity_mask));
+free_cpumask:
+       free_cpumask_var(new_value);
+       return ret;
+}
+
+static struct kparam_string cq_affinity_list_kparam_str = {
+       .maxlen = sizeof(cq_affinity_list),
+       .string = cq_affinity_list
+};
+
+static const struct kernel_param_ops cq_affinity_list_ops = {
+       .set    = cq_affinity_list_set,
+       .get    = param_get_string,
+};
+
+module_param_cb(cq_affinity_list, &cq_affinity_list_ops,
+               &cq_affinity_list_kparam_str, 0644);
+MODULE_PARM_DESC(cq_affinity_list, "Sets the list of cpus to use as cq 
vectors."
+                                  "(default: use all possible CPUs)");
+
+static char hostname[MAXHOSTNAMELEN] = "";
+
+static int hostname_set(const char *val, const struct kernel_param *kp)
+{
+       int ret = 0, len = strlen(val);
+
+       if (len >= sizeof(hostname))
+               return -EINVAL;
+       strlcpy(hostname, val, sizeof(hostname));
+       *strchrnul(hostname, '\n') = '\0';
+
+       INFO_NP("hostname changed to %s\n", hostname);
+       return ret;
+}
+
+static struct kparam_string hostname_kparam_str = {
+       .maxlen = sizeof(hostname),
+       .string = hostname
+};
+
+static const struct kernel_param_ops hostname_ops = {
+       .set    = hostname_set,
+       .get    = param_get_string,
+};
+
+module_param_cb(hostname, &hostname_ops,
+               &hostname_kparam_str, 0644);
+MODULE_PARM_DESC(hostname, "Sets hostname of local server, will send to the"
+                " other side if set,  will display togather with addr "
+                "(default: empty)");
+
+static struct dentry *ibtrs_srv_debugfs_dir;
+static struct dentry *mempool_debugfs_dir;
+
+static struct rdma_cm_id       *cm_id_ip;
+static struct rdma_cm_id       *cm_id_ib;
+static DEFINE_MUTEX(sess_mutex);
+static LIST_HEAD(sess_list);
+static DECLARE_WAIT_QUEUE_HEAD(sess_list_waitq);
+static struct workqueue_struct *destroy_wq;
+
+static LIST_HEAD(device_list);
+static DEFINE_MUTEX(device_list_mutex);
+
+static DEFINE_MUTEX(buf_pool_mutex);
+static LIST_HEAD(free_buf_pool_list);
+static int nr_free_buf_pool;
+static int nr_total_buf_pool;
+static int nr_active_sessions;
+
+static const struct ibtrs_srv_ops *srv_ops;
+enum ssm_ev {
+       SSM_EV_CON_DISCONNECTED,
+       SSM_EV_CON_EST_ERR,
+       SSM_EV_CON_CONNECTED,
+       SSM_EV_SESS_CLOSE,
+       SSM_EV_SYSFS_DISCONNECT
+};
+
+static inline const char *ssm_ev_str(enum ssm_ev ev)
+{
+       switch (ev) {
+       case SSM_EV_CON_DISCONNECTED:
+               return "SSM_EV_CON_DISCONNECTED";
+       case SSM_EV_CON_EST_ERR:
+               return "SSM_EV_CON_EST_ERR";
+       case SSM_EV_CON_CONNECTED:
+               return "SSM_EV_CON_CONNECTED";
+       case SSM_EV_SESS_CLOSE:
+               return "SSM_EV_SESS_CLOSE";
+       case SSM_EV_SYSFS_DISCONNECT:
+               return "SSM_EV_SYSFS_DISCONNECT";
+       default:
+               return "UNKNOWN";
+       }
+}
+
+static const char *ssm_state_str(enum ssm_state state)
+{
+       switch (state) {
+       case SSM_STATE_IDLE:
+               return "SSM_STATE_IDLE";
+       case SSM_STATE_CONNECTED:
+               return "SSM_STATE_CONNECTED";
+       case SSM_STATE_CLOSING:
+               return "SSM_STATE_CLOSING";
+       case SSM_STATE_CLOSED:
+               return "SSM_STATE_CLOSED";
+       default:
+               return "UNKNOWN";
+       }
+}
+
+enum csm_state {
+       CSM_STATE_REQUESTED,
+       CSM_STATE_CONNECTED,
+       CSM_STATE_CLOSING,
+       CSM_STATE_FLUSHING,
+       CSM_STATE_CLOSED
+};
+
+static inline const char *csm_state_str(enum csm_state s)
+{
+       switch (s) {
+       case CSM_STATE_REQUESTED:
+               return "CSM_STATE_REQUESTED";
+       case CSM_STATE_CONNECTED:
+               return "CSM_STATE_CONNECTED";
+       case CSM_STATE_CLOSING:
+               return "CSM_STATE_CLOSING";
+       case CSM_STATE_FLUSHING:
+               return "CSM_STATE_FLUSHING";
+       case CSM_STATE_CLOSED:
+               return "CSM_STATE_CLOSED";
+       default:
+               return "UNKNOWN";
+       }
+}
+
+enum csm_ev {
+       CSM_EV_CON_REQUEST,
+       CSM_EV_CON_ESTABLISHED,
+       CSM_EV_CON_ERROR,
+       CSM_EV_DEVICE_REMOVAL,
+       CSM_EV_SESS_CLOSING,
+       CSM_EV_CON_DISCONNECTED,
+       CSM_EV_BEACON_COMPLETED
+};
+
+static inline const char *csm_ev_str(enum csm_ev ev)
+{
+       switch (ev) {
+       case CSM_EV_CON_REQUEST:
+               return "CSM_EV_CON_REQUEST";
+       case CSM_EV_CON_ESTABLISHED:
+               return "CSM_EV_CON_ESTABLISHED";
+       case CSM_EV_CON_ERROR:
+               return "CSM_EV_CON_ERROR";
+       case CSM_EV_DEVICE_REMOVAL:
+               return "CSM_EV_DEVICE_REMOVAL";
+       case CSM_EV_SESS_CLOSING:
+               return "CSM_EV_SESS_CLOSING";
+       case CSM_EV_CON_DISCONNECTED:
+               return "CSM_EV_CON_DISCONNECTED";
+       case CSM_EV_BEACON_COMPLETED:
+               return "CSM_EV_BEACON_COMPLETED";
+       default:
+               return "UNKNOWN";
+       }
+}
+
+struct sess_put_work {
+       struct ibtrs_session    *sess;
+       struct work_struct      work;
+};
+
+struct ibtrs_srv_sysfs_put_work {
+       struct work_struct      work;
+       struct ibtrs_session    *sess;
+};
+
+struct ssm_create_con_work {
+       struct ibtrs_session    *sess;
+       struct rdma_cm_id       *cm_id;
+       struct work_struct      work;
+       bool                    user;/* true if con is for user msg only */
+};
+
+struct ssm_work {
+       struct ibtrs_session    *sess;
+       enum ssm_ev             ev;
+       struct work_struct      work;
+};
+
+struct ibtrs_con {
+       /* list for ibtrs_session->con_list */
+       struct list_head        list;
+       enum csm_state          state;
+       /* true if con is for user msg only */
+       bool                    user;
+       bool                    failover_enabled;
+       struct ib_con           ib_con;
+       atomic_t                wr_cnt;
+       struct rdma_cm_id       *cm_id;
+       int                     cq_vector;
+       struct ibtrs_session    *sess;
+       struct work_struct      cq_work;
+       struct workqueue_struct *cq_wq;
+       struct workqueue_struct *rdma_resp_wq;
+       struct ib_wc            wcs[WC_ARRAY_SIZE];
+       bool                    device_being_removed;
+};
+
+struct csm_work {
+       struct ibtrs_con        *con;
+       enum csm_ev             ev;
+       struct work_struct      work;
+};
+
+struct msg_work {
+       struct work_struct      work;
+       struct ibtrs_con        *con;
+       void                    *msg;
+};
+
+struct ibtrs_device {
+       struct list_head        entry;
+       struct ib_device        *device;
+       struct ib_session       ib_sess;
+       struct completion       *ib_sess_destroy_completion;
+       struct kref             ref;
+};
+
+struct ibtrs_ops_id {
+       struct ibtrs_con                *con;
+       u32                             msg_id;
+       u8                              dir;
+       u64                             data_dma_addr;
+       struct ibtrs_msg_req_rdma_write *req;
+       struct ib_rdma_wr               *tx_wr;
+       struct ib_sge                   *tx_sg;
+       int                             status;
+       struct work_struct              work;
+} ____cacheline_aligned;
+
+static void csm_set_state(struct ibtrs_con *con, enum csm_state s)
+{
+       if (con->state != s) {
+               DEB("changing con %p csm state from %s to %s\n", con,
+                   csm_state_str(con->state), csm_state_str(s));
+               con->state = s;
+       }
+}
+
+static void ssm_set_state(struct ibtrs_session *sess, enum ssm_state state)
+{
+       if (sess->state != state) {
+               DEB("changing sess %p ssm state from %s to %s\n", sess,
+                   ssm_state_str(sess->state), ssm_state_str(state));
+               sess->state = state;
+       }
+}
+
+static struct ibtrs_con *ibtrs_srv_get_user_con(struct ibtrs_session *sess)
+{
+       struct ibtrs_con *con;
+
+       if (sess->est_cnt > 0) {
+               list_for_each_entry(con, &sess->con_list, list) {
+                       if (con->user && con->state == CSM_STATE_CONNECTED)
+                               return con;
+               }
+       }
+       return NULL;
+}
+
+static void csm_init(struct ibtrs_con *con);
+static void csm_schedule_event(struct ibtrs_con *con, enum csm_ev ev);
+static int ssm_init(struct ibtrs_session *sess);
+static int ssm_schedule_event(struct ibtrs_session *sess, enum ssm_ev ev);
+
+static int ibtrs_srv_get_sess_current_port_num(struct ibtrs_session *sess)
+{
+       struct ibtrs_con *con, *next;
+       struct ibtrs_con *ucon = ibtrs_srv_get_user_con(sess);
+
+       if (sess->state != SSM_STATE_CONNECTED || !ucon)
+               return -ECOMM;
+
+       mutex_lock(&sess->lock);
+       if (WARN_ON(!sess->cm_id)) {
+               mutex_unlock(&sess->lock);
+               return -ENODEV;
+       }
+       list_for_each_entry_safe(con, next, &sess->con_list, list) {
+               if (unlikely(con->state != CSM_STATE_CONNECTED)) {
+                       mutex_unlock(&sess->lock);
+                       return -ECOMM;
+               }
+               if (con->cm_id->port_num != sess->cm_id->port_num) {
+                       mutex_unlock(&sess->lock);
+                       return 0;
+               }
+       }
+       mutex_unlock(&sess->lock);
+       return sess->cm_id->port_num;
+}
+
+int ibtrs_srv_current_hca_port_to_str(struct ibtrs_session *sess,
+                                     char *buf, size_t len)
+{
+       if (!ibtrs_srv_get_sess_current_port_num(sess))
+               return scnprintf(buf, len, "migrating\n");
+
+       if (ibtrs_srv_get_sess_current_port_num(sess) < 0)
+               return ibtrs_srv_get_sess_current_port_num(sess);
+
+       return scnprintf(buf, len, "%u\n",
+                        ibtrs_srv_get_sess_current_port_num(sess));
+}
+
+inline const char *ibtrs_srv_get_sess_hca_name(struct ibtrs_session *sess)
+{
+       struct ibtrs_con *con = ibtrs_srv_get_user_con(sess);
+
+       if (con)
+               return sess->dev->device->name;
+       return "n/a";
+}
+
+static void ibtrs_srv_update_rdma_stats(struct ibtrs_srv_stats *s,
+                                       size_t size, bool read)
+{
+       int inflight;
+
+       if (read) {
+               atomic64_inc(&s->rdma_stats.cnt_read);
+               atomic64_add(size, &s->rdma_stats.size_total_read);
+       } else {
+               atomic64_inc(&s->rdma_stats.cnt_write);
+               atomic64_add(size, &s->rdma_stats.size_total_write);
+       }
+
+       inflight = atomic_inc_return(&s->rdma_stats.inflight);
+       atomic64_add(inflight, &s->rdma_stats.inflight_total);
+}
+
+static inline void ibtrs_srv_stats_dec_inflight(struct ibtrs_session *sess)
+{
+       if (!atomic_dec_return(&sess->stats.rdma_stats.inflight))
+               wake_up(&sess->bufs_wait);
+}
+
+int ibtrs_srv_reset_rdma_stats(struct ibtrs_session *sess, bool enable)
+{
+       if (enable) {
+               struct ibtrs_srv_stats_rdma_stats *r = &sess->stats.rdma_stats;
+
+               /*
+                * TODO: inflight is used for flow control
+                * we can't memset the whole structure, so reset each member
+                */
+               atomic64_set(&r->cnt_read, 0);
+               atomic64_set(&r->size_total_read, 0);
+               atomic64_set(&r->cnt_write, 0);
+               atomic64_set(&r->size_total_write, 0);
+               atomic64_set(&r->inflight_total, 0);
+               return 0;
+       } else {
+               return -EINVAL;
+       }
+}
+
+ssize_t ibtrs_srv_stats_rdma_to_str(struct ibtrs_session *sess,
+                                   char *page, size_t len)
+{
+       struct ibtrs_srv_stats_rdma_stats *r = &sess->stats.rdma_stats;
+
+       return scnprintf(page, len, "%ld %ld %ld %ld %u %ld\n",
+                        atomic64_read(&r->cnt_read),
+                        atomic64_read(&r->size_total_read),
+                        atomic64_read(&r->cnt_write),
+                        atomic64_read(&r->size_total_write),
+                        atomic_read(&r->inflight),
+                        (atomic64_read(&r->cnt_read) +
+                         atomic64_read(&r->cnt_write)) ?
+                        atomic64_read(&r->inflight_total) /
+                        (atomic64_read(&r->cnt_read) +
+                         atomic64_read(&r->cnt_write)) : 0);
+}
+
+int ibtrs_srv_reset_user_ib_msgs_stats(struct ibtrs_session *sess, bool enable)
+{
+       if (enable) {
+               memset(&sess->stats.user_ib_msgs, 0,
+                      sizeof(sess->stats.user_ib_msgs));
+               return 0;
+       } else {
+               return -EINVAL;
+       }
+}
+
+int ibtrs_srv_stats_user_ib_msgs_to_str(struct ibtrs_session *sess, char *buf,
+                                       size_t len)
+{
+       return snprintf(buf, len, "%ld %ld %ld %ld\n",
+                       atomic64_read(&sess->stats.user_ib_msgs.recv_msg_cnt),
+                       atomic64_read(&sess->stats.user_ib_msgs.recv_size),
+                       atomic64_read(&sess->stats.user_ib_msgs.sent_msg_cnt),
+                       atomic64_read(&sess->stats.user_ib_msgs.sent_size));
+}
+
+int ibtrs_srv_reset_wc_completion_stats(struct ibtrs_session *sess, bool 
enable)
+{
+       if (enable) {
+               memset(&sess->stats.wc_comp, 0, sizeof(sess->stats.wc_comp));
+               return 0;
+       } else {
+               return -EINVAL;
+       }
+}
+
+int ibtrs_srv_stats_wc_completion_to_str(struct ibtrs_session *sess, char *buf,
+                                        size_t len)
+{
+       return snprintf(buf, len, "%d %ld %ld\n",
+                       atomic_read(&sess->stats.wc_comp.max_wc_cnt),
+                       atomic64_read(&sess->stats.wc_comp.total_wc_cnt),
+                       atomic64_read(&sess->stats.wc_comp.calls));
+}
+
+ssize_t ibtrs_srv_reset_all_help(struct ibtrs_session *sess,
+                                char *page, size_t len)
+{
+       return scnprintf(page, PAGE_SIZE, "echo 1 to reset all statistics\n");
+}
+
+int ibtrs_srv_reset_all_stats(struct ibtrs_session *sess, bool enable)
+{
+       if (enable) {
+               ibtrs_srv_reset_wc_completion_stats(sess, enable);
+               ibtrs_srv_reset_user_ib_msgs_stats(sess, enable);
+               ibtrs_srv_reset_rdma_stats(sess, enable);
+               return 0;
+       } else {
+               return -EINVAL;
+       }
+}
+
+static inline bool srv_ops_are_valid(const struct ibtrs_srv_ops *ops)
+{
+       return ops && ops->sess_ev && ops->rdma_ev && ops->recv;
+}
+
+static int ibtrs_srv_sess_ev(struct ibtrs_session *sess,
+                            enum ibtrs_srv_sess_ev ev)
+{
+       if (!sess->session_announced_to_user &&
+           ev != IBTRS_SRV_SESS_EV_CONNECTED)
+               return 0;
+
+       if (ev == IBTRS_SRV_SESS_EV_CONNECTED)
+               sess->session_announced_to_user = true;
+
+       return srv_ops->sess_ev(sess, ev, sess->priv);
+}
+
+static void free_id(struct ibtrs_ops_id *id)
+{
+       if (!id)
+               return;
+       kfree(id->tx_wr);
+       kfree(id->tx_sg);
+       kvfree(id);
+}
+
+static void free_sess_tx_bufs(struct ibtrs_session *sess)
+{
+       struct ibtrs_iu *e, *next;
+       int i;
+
+       if (sess->rdma_info_iu) {
+               ibtrs_iu_free(sess->rdma_info_iu, DMA_TO_DEVICE,
+                             sess->dev->device);
+               sess->rdma_info_iu = NULL;
+       }
+
+       WARN_ON(sess->tx_bufs_used);
+       list_for_each_entry_safe(e, next, &sess->tx_bufs, list) {
+               list_del(&e->list);
+               ibtrs_iu_free(e, DMA_TO_DEVICE, sess->dev->device);
+       }
+
+       if (sess->ops_ids) {
+               for (i = 0; i < sess->queue_depth; i++)
+                       free_id(sess->ops_ids[i]);
+               kfree(sess->ops_ids);
+               sess->ops_ids = NULL;
+       }
+}
+
+static void put_tx_iu(struct ibtrs_session *sess, struct ibtrs_iu *iu)
+{
+       spin_lock(&sess->tx_bufs_lock);
+       ibtrs_iu_put(&sess->tx_bufs, iu);
+       sess->tx_bufs_used--;
+       spin_unlock(&sess->tx_bufs_lock);
+}
+
+static struct ibtrs_iu *get_tx_iu(struct ibtrs_session *sess)
+{
+       struct ibtrs_iu *iu;
+
+       spin_lock(&sess->tx_bufs_lock);
+       iu = ibtrs_iu_get(&sess->tx_bufs);
+       if (iu)
+               sess->tx_bufs_used++;
+       spin_unlock(&sess->tx_bufs_lock);
+
+       return iu;
+}
+
+static int rdma_write_sg(struct ibtrs_ops_id *id)
+{
+       int err, i, offset;
+       struct ib_send_wr *bad_wr;
+       struct ib_rdma_wr *wr = NULL;
+       struct ibtrs_session *sess = id->con->sess;
+
+       if (unlikely(id->req->sg_cnt == 0))
+               return -EINVAL;
+
+       offset = 0;
+       for (i = 0; i < id->req->sg_cnt; i++) {
+               struct ib_sge *list;
+
+               wr              = &id->tx_wr[i];
+               list            = &id->tx_sg[i];
+               list->addr      = id->data_dma_addr + offset;
+               list->length    = id->req->desc[i].len;
+
+               /* WR will fail with length error
+                * if this is 0
+                */
+               if (unlikely(list->length == 0)) {
+                       ERR(sess, "Invalid RDMA-Write sg list length 0\n");
+                       return -EINVAL;
+               }
+
+               list->lkey = sess->dev->ib_sess.pd->local_dma_lkey;
+               offset += list->length;
+
+               wr->wr.wr_id            = (uintptr_t)id;
+               wr->wr.sg_list          = list;
+               wr->wr.num_sge          = 1;
+               wr->remote_addr = id->req->desc[i].addr;
+               wr->rkey        = id->req->desc[i].key;
+
+               if (i < (id->req->sg_cnt - 1)) {
+                       wr->wr.next     = &id->tx_wr[i + 1].wr;
+                       wr->wr.opcode   = IB_WR_RDMA_WRITE;
+                       wr->wr.ex.imm_data      = 0;
+                       wr->wr.send_flags       = 0;
+               }
+       }
+
+       wr->wr.opcode   = IB_WR_RDMA_WRITE_WITH_IMM;
+       wr->wr.next     = NULL;
+       wr->wr.send_flags       = atomic_inc_return(&id->con->wr_cnt) %
+                               sess->queue_depth ? 0 : IB_SEND_SIGNALED;
+       wr->wr.ex.imm_data      = cpu_to_be32(id->msg_id << 16);
+
+       err = ib_post_send(id->con->ib_con.qp, &id->tx_wr[0].wr, &bad_wr);
+       if (unlikely(err))
+               ERR(sess,
+                   "Posting RDMA-Write-Request to QP failed, errno: %d\n",
+                   err);
+
+       return err;
+}
+
+static int send_io_resp_imm(struct ibtrs_con *con, int msg_id, s16 errno)
+{
+       int err;
+
+       err = ibtrs_write_empty_imm(con->ib_con.qp, (msg_id << 16) | (u16)errno,
+                                   atomic_inc_return(&con->wr_cnt) %
+                                   con->sess->queue_depth ? 0 :
+                                   IB_SEND_SIGNALED);
+       if (unlikely(err))
+               ERR_RL(con->sess, "Posting RDMA-Write-Request to QP failed,"
+                      " errno: %d\n", err);
+
+       return err;
+}
+
+static int send_heartbeat_raw(struct ibtrs_con *con)
+{
+       int err;
+
+       err = ibtrs_write_empty_imm(con->ib_con.qp, UINT_MAX, IB_SEND_SIGNALED);
+       if (unlikely(err)) {
+               ERR(con->sess,
+                   "Sending heartbeat failed, posting msg to QP failed,"
+                   " errno: %d\n", err);
+               return err;
+       }
+
+       ibtrs_heartbeat_set_send_ts(&con->sess->heartbeat);
+       return err;
+}
+
+static int send_heartbeat(struct ibtrs_session *sess)
+{
+       struct ibtrs_con *con;
+
+       if (unlikely(list_empty(&sess->con_list)))
+               return -ENOENT;
+
+       con = list_first_entry(&sess->con_list, struct ibtrs_con, list);
+       WARN_ON(!con->user);
+
+       if (unlikely(con->state != CSM_STATE_CONNECTED))
+               return -ENOTCONN;
+
+       return send_heartbeat_raw(con);
+}
+
+static int ibtrs_srv_queue_resp_rdma(struct ibtrs_ops_id *id)
+{
+       if (unlikely(id->con->state != CSM_STATE_CONNECTED)) {
+               ERR_RL(id->con->sess, "Sending I/O response failed, "
+                      " session is disconnected, sess state %s,"
+                      " con state %s\n", ssm_state_str(id->con->sess->state),
+                      csm_state_str(id->con->state));
+               return -ECOMM;
+       }
+
+       if (WARN_ON(!queue_work(id->con->rdma_resp_wq, &id->work))) {
+               ERR_RL(id->con->sess, "Sending I/O response failed,"
+                      " couldn't queue work\n");
+               return -EPERM;
+       }
+
+       return 0;
+}
+
+static void ibtrs_srv_resp_rdma_worker(struct work_struct *work)
+{
+       struct ibtrs_ops_id *id;
+       int err;
+       struct ibtrs_session *sess;
+
+       id = container_of(work, struct ibtrs_ops_id, work);
+       sess = id->con->sess;
+
+       if (id->status || id->dir == WRITE) {
+               DEB("err or write msg_id=%d, status=%d, sending response\n",
+                   id->msg_id, id->status);
+
+               err = send_io_resp_imm(id->con, id->msg_id, id->status);
+               if (unlikely(err)) {
+                       ERR_RL(sess, "Sending imm msg failed, errno: %d\n",
+                              err);
+                       if (err == -ENOMEM && !ibtrs_srv_queue_resp_rdma(id))
+                               return;
+                       csm_schedule_event(id->con, CSM_EV_CON_ERROR);
+               }
+
+               ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+               ibtrs_srv_stats_dec_inflight(sess);
+               return;
+       }
+
+       DEB("read req msg_id=%d completed, sending data\n", id->msg_id);
+       err = rdma_write_sg(id);
+       if (unlikely(err)) {
+               ERR_RL(sess, "Sending I/O read response failed, errno: %d\n",
+                      err);
+               if (err == -ENOMEM && !ibtrs_srv_queue_resp_rdma(id))
+                       return;
+               csm_schedule_event(id->con, CSM_EV_CON_ERROR);
+       }
+       ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+       ibtrs_srv_stats_dec_inflight(sess);
+}
+
+/*
+ * This function may be called from an interrupt context, e.g. on bio_endio
+ * callback on the user module. Queue the real work on a workqueue so we don't
+ * need to hold an irq spinlock.
+ */
+int ibtrs_srv_resp_rdma(struct ibtrs_ops_id *id, int status)
+{
+       int err = 0;
+
+       if (unlikely(!id)) {
+               ERR_NP("Sending I/O response failed, I/O ops id NULL\n");
+               return -EINVAL;
+       }
+
+       id->status = status;
+       INIT_WORK(&id->work, ibtrs_srv_resp_rdma_worker);
+
+       err = ibtrs_srv_queue_resp_rdma(id);
+       if (err)
+               ibtrs_srv_stats_dec_inflight(id->con->sess);
+       return err;
+}
+EXPORT_SYMBOL(ibtrs_srv_resp_rdma);
+
+static bool ibtrs_srv_get_usr_msg_buf(struct ibtrs_session *sess)
+{
+       return atomic_dec_if_positive(&sess->peer_usr_msg_bufs) >= 0;
+}
+
+int ibtrs_srv_send(struct ibtrs_session *sess, const struct kvec *vec,
+                  size_t nr)
+{
+       struct ibtrs_iu *iu = NULL;
+       struct ibtrs_con *con;
+       struct ibtrs_msg_user *msg;
+       size_t len;
+       bool closed_st = false;
+       int err;
+
+       if (WARN_ONCE(list_empty(&sess->con_list),
+                     "Sending message failed, no connection available\n"))
+               return -ECOMM;
+       con = ibtrs_srv_get_user_con(sess);
+
+       if (unlikely(!con)) {
+               WRN(sess,
+                   "Sending message failed, no user connection exists\n");
+               return -ECOMM;
+       }
+
+       len = kvec_length(vec, nr);
+
+       if (unlikely(len + IBTRS_HDR_LEN > MAX_REQ_SIZE)) {
+               WRN_RL(sess, "Sending message failed, passed data too big,"
+                      " %zu > %lu\n", len, MAX_REQ_SIZE - IBTRS_HDR_LEN);
+               return -EMSGSIZE;
+       }
+
+       wait_event(sess->mu_buf_wait_q,
+                  (closed_st = (con->state != CSM_STATE_CONNECTED)) ||
+                  ibtrs_srv_get_usr_msg_buf(sess));
+
+       if (unlikely(closed_st)) {
+               ERR_RL(sess, "Sending message failed, not connected (state"
+                      " %s)\n", csm_state_str(con->state));
+               return -ECOMM;
+       }
+
+       wait_event(sess->mu_iu_wait_q,
+                  (closed_st = (con->state != CSM_STATE_CONNECTED)) ||
+                  (iu = get_tx_iu(sess)) != NULL);
+
+       if (unlikely(closed_st)) {
+               ERR_RL(sess, "Sending message failed, not connected (state"
+                      " %s)\n", csm_state_str(con->state));
+               err = -ECOMM;
+               goto err_iu;
+       }
+
+       msg             = iu->buf;
+       msg->hdr.type   = IBTRS_MSG_USER;
+       msg->hdr.tsize  = len + IBTRS_HDR_LEN;
+       copy_from_kvec(msg->payl, vec, len);
+
+       ibtrs_deb_msg_hdr("Sending: ", &msg->hdr);
+       err = ibtrs_post_send(con->ib_con.qp,
+                             con->sess->dev->ib_sess.pd->__internal_mr, iu,
+                             msg->hdr.tsize);
+       if (unlikely(err)) {
+               ERR_RL(sess, "Sending message failed, posting message to QP"
+                      " failed, errno: %d\n", err);
+               goto err_post_send;
+       }
+       ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+
+       atomic64_inc(&sess->stats.user_ib_msgs.sent_msg_cnt);
+       atomic64_add(len, &sess->stats.user_ib_msgs.sent_size);
+
+       return 0;
+
+err_post_send:
+       put_tx_iu(sess, iu);
+       wake_up(&con->sess->mu_iu_wait_q);
+err_iu:
+       atomic_inc(&sess->peer_usr_msg_bufs);
+       wake_up(&con->sess->mu_buf_wait_q);
+       return err;
+}
+EXPORT_SYMBOL(ibtrs_srv_send);
+
+inline void ibtrs_srv_set_sess_priv(struct ibtrs_session *sess, void *priv)
+{
+       sess->priv = priv;
+}
+EXPORT_SYMBOL(ibtrs_srv_set_sess_priv);
+
+static int ibtrs_post_recv(struct ibtrs_con *con, struct ibtrs_iu *iu)
+{
+       struct ib_recv_wr wr, *bad_wr;
+       struct ib_sge list;
+       int err;
+
+       list.addr   = iu->dma_addr;
+       list.length = iu->size;
+       list.lkey   = con->sess->dev->ib_sess.pd->local_dma_lkey;
+
+       if (unlikely(list.length == 0)) {
+               ERR_RL(con->sess, "Posting recv buffer failed, invalid sg list"
+                      " length 0\n");
+               return -EINVAL;
+       }
+
+       wr.next     = NULL;
+       wr.wr_id    = (uintptr_t)iu;
+       wr.sg_list  = &list;
+       wr.num_sge  = 1;
+
+       err = ib_post_recv(con->ib_con.qp, &wr, &bad_wr);
+       if (unlikely(err))
+               ERR_RL(con->sess, "Posting recv buffer failed, errno: %d\n",
+                      err);
+
+       return err;
+}
+
+static struct ibtrs_rcv_buf_pool *alloc_rcv_buf_pool(void)
+{
+       struct ibtrs_rcv_buf_pool *pool;
+       struct page *cont_pages = NULL;
+       struct ibtrs_mem_chunk *mem_chunk;
+       int alloced_bufs = 0;
+       int rcv_buf_order = get_order(rcv_buf_size);
+       int max_order, alloc_order;
+       unsigned int alloced_size;
+
+       pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+       if (!pool) {
+               ERR_NP("Failed to allocate memory for buffer pool struct\n");
+               return NULL;
+       }
+
+       pool->rcv_bufs = kcalloc(sess_queue_depth, sizeof(*pool->rcv_bufs),
+                                GFP_KERNEL);
+       if (!pool->rcv_bufs) {
+               ERR_NP("Failed to allocate array for receive buffers\n");
+               kfree(pool);
+               return NULL;
+       }
+       INIT_LIST_HEAD(&pool->chunk_list);
+
+       while (alloced_bufs < sess_queue_depth) {
+               mem_chunk = kzalloc(sizeof(*mem_chunk), GFP_KERNEL);
+               if (!mem_chunk) {
+                       ERR_NP("Failed to allocate memory for memory chunk"
+                              " struct\n");
+                       goto alloc_fail;
+               }
+
+               max_order = min(MAX_ORDER - 1,
+                               get_order((sess_queue_depth - alloced_bufs) *
+                                         rcv_buf_size));
+               for (alloc_order = max_order; alloc_order > rcv_buf_order;
+                    alloc_order--) {
+                       cont_pages = alloc_pages(__GFP_NORETRY | __GFP_NOWARN |
+                                                __GFP_ZERO, alloc_order);
+                       if (cont_pages) {
+                               DEB("Allocated order %d pages\n", alloc_order);
+                               break;
+                       }
+                       DEB("Failed to allocate order %d pages\n", alloc_order);
+               }
+
+               if (cont_pages) {
+                       void *recv_buf_start;
+
+                       mem_chunk->order = alloc_order;
+                       mem_chunk->addr = page_address(cont_pages);
+                       list_add_tail(&mem_chunk->list, &pool->chunk_list);
+                       alloced_size = (1 << alloc_order) * PAGE_SIZE;
+
+                       DEB("Memory chunk size: %d, address: %p\n",
+                           alloced_size, mem_chunk->addr);
+
+                       recv_buf_start = mem_chunk->addr;
+                       while (alloced_size > rcv_buf_size &&
+                              alloced_bufs < sess_queue_depth) {
+                               pool->rcv_bufs[alloced_bufs].buf =
+                                       recv_buf_start;
+                               alloced_bufs++;
+                               recv_buf_start += rcv_buf_size;
+                               alloced_size -= rcv_buf_size;
+                       }
+               } else {
+                       /* if allocation of pages to fit multiple rcv_buf's
+                        * failed we fall back to alloc'ing exact number of
+                        * pages
+                        */
+                       gfp_t gfp_mask = (GFP_KERNEL | __GFP_REPEAT |
+                                         __GFP_ZERO);
+                       void *addr = alloc_pages_exact(rcv_buf_size, gfp_mask);
+
+                       if (!addr) {
+                               ERR_NP("Failed to allocate memory for "
+                                      " receive buffer (size %dB)\n",
+                                      rcv_buf_size);
+                               goto alloc_fail;
+                       }
+
+                       DEB("Alloced pages exact at %p for rcv_bufs[%d]\n",
+                           addr, alloced_bufs);
+
+                       mem_chunk->addr = addr;
+                       mem_chunk->order = IBTRS_MEM_CHUNK_NOORDER;
+                       list_add_tail(&mem_chunk->list, &pool->chunk_list);
+
+                       pool->rcv_bufs[alloced_bufs].buf = addr;
+                       alloced_bufs++;
+               }
+       }
+
+       return pool;
+
+alloc_fail:
+       if (!list_empty(&pool->chunk_list)) {
+               struct ibtrs_mem_chunk *tmp;
+
+               list_for_each_entry_safe(mem_chunk, tmp, &pool->chunk_list,
+                                        list) {
+                       if (mem_chunk->order != IBTRS_MEM_CHUNK_NOORDER)
+                               free_pages((unsigned long)mem_chunk->addr,
+                                          mem_chunk->order);
+                       else
+                               free_pages_exact(mem_chunk->addr, rcv_buf_size);
+                       list_del(&mem_chunk->list);
+                       kfree(mem_chunk);
+               }
+       }
+       kfree(pool->rcv_bufs);
+       kfree(pool);
+       return NULL;
+}
+
+static struct ibtrs_rcv_buf_pool *__get_pool_from_list(void)
+{
+       struct ibtrs_rcv_buf_pool *pool = NULL;
+
+       if (!list_empty(&free_buf_pool_list)) {
+               DEB("Getting buf pool from pre-allocated list\n");
+               pool = list_first_entry(&free_buf_pool_list,
+                                       struct ibtrs_rcv_buf_pool, list);
+               list_del(&pool->list);
+               nr_free_buf_pool--;
+       }
+
+       return pool;
+}
+
+static void __put_pool_on_list(struct ibtrs_rcv_buf_pool *pool)
+{
+       list_add(&pool->list, &free_buf_pool_list);
+       nr_free_buf_pool++;
+       DEB("Put buf pool back to the free list (nr_free_buf_pool: %d)\n",
+           nr_free_buf_pool);
+}
+
+static struct ibtrs_rcv_buf_pool *get_alloc_rcv_buf_pool(void)
+{
+       struct ibtrs_rcv_buf_pool *pool = NULL;
+
+       mutex_lock(&buf_pool_mutex);
+       if (nr_active_sessions >= pool_size_hi_wm) {
+               WARN_ON(nr_free_buf_pool || !list_empty(&free_buf_pool_list));
+               DEB("current nr_active_sessions (%d), pool_size_hi_wm (%d),"
+                   ", allocating.\n", nr_active_sessions, pool_size_hi_wm);
+               pool = alloc_rcv_buf_pool();
+       } else if (nr_total_buf_pool < pool_size_hi_wm) {
+               /* try to allocate new pool while used+free is less then
+                * watermark
+                */
+               DEB("nr_total_buf_pool (%d) smaller than pool_size_hi_wm (%d)"
+                   ", trying to allocate.\n", nr_total_buf_pool,
+                   pool_size_hi_wm);
+               pool = alloc_rcv_buf_pool();
+               if (pool)
+                       nr_total_buf_pool++;
+               else
+                       pool = __get_pool_from_list();
+       } else if (nr_total_buf_pool == pool_size_hi_wm) {
+               /* pool size has already reached watermark, check if there are
+                * free pools on the list
+                */
+               if (nr_free_buf_pool) {
+                       pool = __get_pool_from_list();
+                       WARN_ON(!pool);
+                       DEB("Got pool from free list (nr_free_buf_pool: %d)\n",
+                           nr_free_buf_pool);
+               } else {
+                       /* all pools are already being used */
+                       DEB("No free pool on the list\n");
+                       WARN_ON((nr_active_sessions != nr_total_buf_pool) ||
+                               nr_free_buf_pool);
+                       pool = alloc_rcv_buf_pool();
+               }
+       } else {
+               /* all possibilities should be covered */
+               WARN_ON(1);
+       }
+
+       if (pool)
+               nr_active_sessions++;
+
+       mutex_unlock(&buf_pool_mutex);
+
+       return pool;
+}
+
+static void free_recv_buf_pool(struct ibtrs_rcv_buf_pool *pool)
+{
+       struct ibtrs_mem_chunk *mem_chunk, *tmp;
+
+       DEB("Freeing memory chunks for %d receive buffers\n", sess_queue_depth);
+
+       list_for_each_entry_safe(mem_chunk, tmp, &pool->chunk_list, list) {
+               if (mem_chunk->order != IBTRS_MEM_CHUNK_NOORDER)
+                       free_pages((unsigned long)mem_chunk->addr,
+                                  mem_chunk->order);
+               else
+                       free_pages_exact(mem_chunk->addr, rcv_buf_size);
+               list_del(&mem_chunk->list);
+               kfree(mem_chunk);
+       }
+
+       kfree(pool->rcv_bufs);
+       kfree(pool);
+}
+
+static void put_rcv_buf_pool(struct ibtrs_rcv_buf_pool *pool)
+{
+       mutex_lock(&buf_pool_mutex);
+       nr_active_sessions--;
+       if (nr_active_sessions >= pool_size_hi_wm) {
+               mutex_unlock(&buf_pool_mutex);
+               DEB("Freeing buf pool"
+                   " (nr_active_sessions: %d, pool_size_hi_wm: %d)\n",
+                   nr_active_sessions, pool_size_hi_wm);
+               free_recv_buf_pool(pool);
+       } else {
+               __put_pool_on_list(pool);
+               mutex_unlock(&buf_pool_mutex);
+       }
+}
+
+static void unreg_cont_bufs(struct ibtrs_session *sess)
+{
+       struct ibtrs_rcv_buf *buf;
+       int i;
+
+       DEB("Unregistering %d RDMA buffers\n", sess_queue_depth);
+       for (i = 0; i < sess_queue_depth; i++) {
+               buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+               ib_dma_unmap_single(sess->dev->device, buf->rdma_addr,
+                                   rcv_buf_size, DMA_BIDIRECTIONAL);
+       }
+}
+
+static void release_cont_bufs(struct ibtrs_session *sess)
+{
+       unreg_cont_bufs(sess);
+       put_rcv_buf_pool(sess->rcv_buf_pool);
+       sess->rcv_buf_pool = NULL;
+}
+
+static int setup_cont_bufs(struct ibtrs_session *sess)
+{
+       struct ibtrs_rcv_buf *buf;
+       int i, err;
+
+       sess->rcv_buf_pool = get_alloc_rcv_buf_pool();
+       if (!sess->rcv_buf_pool) {
+               ERR(sess, "Failed to allocate receive buffers for session\n");
+               return -ENOMEM;
+       }
+
+       DEB("Mapping %d buffers for RDMA\n", sess->queue_depth);
+       for (i = 0; i < sess->queue_depth; i++) {
+               buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+               buf->rdma_addr = ib_dma_map_single(sess->dev->device, buf->buf,
+                                                  rcv_buf_size,
+                                                  DMA_BIDIRECTIONAL);
+               if (unlikely(ib_dma_mapping_error(sess->dev->device,
+                                                 buf->rdma_addr))) {
+                       ERR_NP("Registering RDMA buf failed,"
+                              " DMA mapping failed\n");
+                       err = -EIO;
+                       goto err_map;
+               }
+       }
+
+       sess->off_len = 31 - ilog2(sess->queue_depth - 1);
+       sess->off_mask = (1 << sess->off_len) - 1;
+
+       INFO(sess, "Allocated %d %dKB RDMA receive buffers, %dKB in total\n",
+            sess->queue_depth, rcv_buf_size >> 10,
+            sess->queue_depth * rcv_buf_size >> 10);
+
+       return 0;
+
+err_map:
+       for (i = 0; i < sess->queue_depth; i++) {
+               buf = &sess->rcv_buf_pool->rcv_bufs[i];
+
+               if (buf->rdma_addr &&
+                   !ib_dma_mapping_error(sess->dev->device, buf->rdma_addr))
+                       ib_dma_unmap_single(sess->dev->device, buf->rdma_addr,
+                                           rcv_buf_size, DMA_BIDIRECTIONAL);
+       }
+       return err;
+}
+
+static void fill_ibtrs_msg_sess_open_resp(struct ibtrs_msg_sess_open_resp *msg,
+                                         struct ibtrs_con *con)
+{
+       int i;
+
+       msg->hdr.type   = IBTRS_MSG_SESS_OPEN_RESP;
+       msg->hdr.tsize  = IBTRS_MSG_SESS_OPEN_RESP_LEN(con->sess->queue_depth);
+
+       msg->ver = con->sess->ver;
+       strlcpy(msg->hostname, hostname, sizeof(msg->hostname));
+       msg->cnt = con->sess->queue_depth;
+       msg->rkey = con->sess->dev->ib_sess.pd->unsafe_global_rkey;
+       msg->max_inflight_msg = con->sess->queue_depth;
+       msg->max_io_size = max_io_size;
+       msg->max_req_size = MAX_REQ_SIZE;
+       for (i = 0; i < con->sess->queue_depth; i++)
+               msg->addr[i] = con->sess->rcv_buf_pool->rcv_bufs[i].rdma_addr;
+}
+
+static void free_sess_rx_bufs(struct ibtrs_session *sess)
+{
+       int i;
+
+       if (sess->dummy_rx_iu) {
+               ibtrs_iu_free(sess->dummy_rx_iu, DMA_FROM_DEVICE,
+                             sess->dev->device);
+               sess->dummy_rx_iu = NULL;
+       }
+
+       if (sess->usr_rx_ring) {
+               for (i = 0; i < USR_CON_BUF_SIZE; ++i)
+                       if (sess->usr_rx_ring[i])
+                               ibtrs_iu_free(sess->usr_rx_ring[i],
+                                             DMA_FROM_DEVICE,
+                                             sess->dev->device);
+               kfree(sess->usr_rx_ring);
+               sess->usr_rx_ring = NULL;
+       }
+}
+
+static int alloc_sess_tx_bufs(struct ibtrs_session *sess)
+{
+       struct ibtrs_iu *iu;
+       struct ibtrs_ops_id *id;
+       struct ib_device *ib_dev = sess->dev->device;
+       int i;
+
+       sess->rdma_info_iu =
+               ibtrs_iu_alloc(0, IBTRS_MSG_SESS_OPEN_RESP_LEN(
+                              sess->queue_depth), GFP_KERNEL, ib_dev,
+                              DMA_TO_DEVICE, true);
+       if (unlikely(!sess->rdma_info_iu)) {
+               ERR_RL(sess, "Can't allocate transfer buffer for "
+                            "sess open resp\n");
+               return -ENOMEM;
+       }
+
+       sess->ops_ids = kcalloc(sess->queue_depth, sizeof(*sess->ops_ids),
+                               GFP_KERNEL);
+       if (unlikely(!sess->ops_ids)) {
+               ERR_RL(sess, "Can't alloc ops_ids for the session\n");
+               goto err;
+       }
+
+       for (i = 0; i < sess->queue_depth; ++i) {
+               id = ibtrs_zalloc(sizeof(*id));
+               if (unlikely(!id)) {
+                       ERR_RL(sess, "Can't alloc ops id for session\n");
+                       goto err;
+               }
+               sess->ops_ids[i] = id;
+       }
+
+       for (i = 0; i < USR_MSG_CNT; ++i) {
+               iu = ibtrs_iu_alloc(i, MAX_REQ_SIZE, GFP_KERNEL,
+                                   ib_dev, DMA_TO_DEVICE, true);
+               if (!iu) {
+                       ERR_RL(sess, "Can't alloc tx bufs for user msgs\n");
+                       goto err;
+               }
+               list_add(&iu->list, &sess->tx_bufs);
+       }
+
+       return 0;
+
+err:
+       free_sess_tx_bufs(sess);
+       return -ENOMEM;
+}
+
+static int alloc_sess_rx_bufs(struct ibtrs_session *sess)
+{
+       int i;
+
+       sess->dummy_rx_iu =
+               ibtrs_iu_alloc(0, IBTRS_HDR_LEN, GFP_KERNEL, sess->dev->device,
+                              DMA_FROM_DEVICE, true);
+       if (!sess->dummy_rx_iu) {
+               ERR(sess, "Failed to allocate dummy IU to receive "
+                         "immediate messages on io connections\n");
+               goto err;
+       }
+
+       sess->usr_rx_ring = kcalloc(USR_CON_BUF_SIZE,
+                                   sizeof(*sess->usr_rx_ring), GFP_KERNEL);
+       if (!sess->usr_rx_ring) {
+               ERR(sess, "Alloc usr_rx_ring for session failed\n");
+               goto err;
+       }
+
+       for (i = 0; i < USR_CON_BUF_SIZE; ++i) {
+               sess->usr_rx_ring[i] =
+                       ibtrs_iu_alloc(i, MAX_REQ_SIZE, GFP_KERNEL,
+                                      sess->dev->device, DMA_FROM_DEVICE,
+                                      true);
+               if (!sess->usr_rx_ring[i]) {
+                       ERR(sess, "Failed to allocate iu for usr_rx_ring\n");
+                       goto err;
+               }
+       }
+
+       return 0;
+
+err:
+       free_sess_rx_bufs(sess);
+       return -ENOMEM;
+}
+
+static int alloc_sess_bufs(struct ibtrs_session *sess)
+{
+       int err;
+
+       err = alloc_sess_rx_bufs(sess);
+       if (err)
+               return err;
+       else
+               return alloc_sess_tx_bufs(sess);
+}
+
+static int post_io_con_recv(struct ibtrs_con *con)
+{
+       int i, ret;
+
+       for (i = 0; i < con->sess->queue_depth; i++) {
+               ret = ibtrs_post_recv(con, con->sess->dummy_rx_iu);
+               if (unlikely(ret))
+                       return ret;
+       }
+
+       return 0;
+}
+
+static int post_user_con_recv(struct ibtrs_con *con)
+{
+       int i, ret;
+
+       for (i = 0; i < USR_CON_BUF_SIZE; i++) {
+               struct ibtrs_iu *iu = con->sess->usr_rx_ring[i];
+
+               ret = ibtrs_post_recv(con, iu);
+               if (unlikely(ret))
+                       return ret;
+       }
+
+       return 0;
+}
+
+static int post_recv(struct ibtrs_con *con)
+{
+       if (con->user)
+               return post_user_con_recv(con);
+       else
+               return post_io_con_recv(con);
+
+       return 0;
+}
+
+static void free_sess_bufs(struct ibtrs_session *sess)
+{
+       free_sess_rx_bufs(sess);
+       free_sess_tx_bufs(sess);
+}
+
+static int init_transfer_bufs(struct ibtrs_con *con)
+{
+       int err;
+       struct ibtrs_session *sess = con->sess;
+
+       if (con->user) {
+               err = alloc_sess_bufs(sess);
+               if (err) {
+                       ERR(sess, "Alloc sess bufs failed: %d\n", err);
+                       return err;
+               }
+       }
+
+       return post_recv(con);
+}
+
+static void process_rdma_write_req(struct ibtrs_con *con,
+                                  struct ibtrs_msg_req_rdma_write *req,
+                                  u32 buf_id, u32 off)
+{
+       int ret;
+       struct ibtrs_ops_id *id;
+       struct ibtrs_session *sess = con->sess;
+
+       if (unlikely(sess->state != SSM_STATE_CONNECTED ||
+                    con->state != CSM_STATE_CONNECTED)) {
+               ERR_RL(sess, "Processing RDMA-Write-Req request failed, "
+                      " session is disconnected, sess state %s,"
+                      " con state %s\n", ssm_state_str(sess->state),
+                      csm_state_str(con->state));
+               return;
+       }
+       ibtrs_srv_update_rdma_stats(&sess->stats, off, true);
+       id = sess->ops_ids[buf_id];
+       kfree(id->tx_wr);
+       kfree(id->tx_sg);
+       id->con         = con;
+       id->dir         = READ;
+       id->msg_id      = buf_id;
+       id->req         = req;
+       id->tx_wr       = kcalloc(req->sg_cnt, sizeof(*id->tx_wr), GFP_KERNEL);
+       id->tx_sg       = kcalloc(req->sg_cnt, sizeof(*id->tx_sg), GFP_KERNEL);
+       if (!id->tx_wr || !id->tx_sg) {
+               ERR_RL(sess, "Processing RDMA-Write-Req failed, work request "
+                      "or scatter gather allocation failed for msg_id %d\n",
+                      buf_id);
+               ret = -ENOMEM;
+               goto send_err_msg;
+       }
+
+       id->data_dma_addr = sess->rcv_buf_pool->rcv_bufs[buf_id].rdma_addr;
+       ret = srv_ops->rdma_ev(con->sess, sess->priv, id,
+                              IBTRS_SRV_RDMA_EV_WRITE_REQ,
+                              sess->rcv_buf_pool->rcv_bufs[buf_id].buf, off);
+
+       if (unlikely(ret)) {
+               ERR_RL(sess, "Processing RDMA-Write-Req failed, user "
+                      "module cb reported for msg_id %d, errno: %d\n",
+                      buf_id, ret);
+               goto send_err_msg;
+       }
+
+       return;
+
+send_err_msg:
+       ret = send_io_resp_imm(con, buf_id, ret);
+       if (ret < 0) {
+               ERR_RL(sess, "Sending err msg for failed RDMA-Write-Req"
+                      " failed, msg_id %d, errno: %d\n", buf_id, ret);
+               csm_schedule_event(con, CSM_EV_CON_ERROR);
+       }
+       ibtrs_srv_stats_dec_inflight(sess);
+}
+
+static void process_rdma_write(struct ibtrs_con *con,
+                              struct ibtrs_msg_rdma_write *req,
+                              u32 buf_id, u32 off)
+{
+       int ret;
+       struct ibtrs_ops_id *id;
+       struct ibtrs_session *sess = con->sess;
+
+       if (unlikely(sess->state != SSM_STATE_CONNECTED ||
+                    con->state != CSM_STATE_CONNECTED)) {
+               ERR_RL(sess, "Processing RDMA-Write request failed, "
+                      " session is disconnected, sess state %s,"
+                      " con state %s\n", ssm_state_str(sess->state),
+                      csm_state_str(con->state));
+               return;
+       }
+       ibtrs_srv_update_rdma_stats(&sess->stats, off, false);
+       id = con->sess->ops_ids[buf_id];
+       id->con    = con;
+       id->dir    = WRITE;
+       id->msg_id = buf_id;
+
+       ret = srv_ops->rdma_ev(sess, sess->priv, id, IBTRS_SRV_RDMA_EV_RECV,
+                              sess->rcv_buf_pool->rcv_bufs[buf_id].buf, off);
+       if (unlikely(ret)) {
+               ERR_RL(sess, "Processing RDMA-Write failed, user module"
+                      " callback reports errno: %d\n", ret);
+               goto send_err_msg;
+       }
+
+       return;
+
+send_err_msg:
+       ret = send_io_resp_imm(con, buf_id, ret);
+       if (ret < 0) {
+               ERR_RL(sess, "Processing RDMA-Write failed, sending I/O"
+                      " response failed, msg_id %d, errno: %d\n",
+                      buf_id, ret);
+               csm_schedule_event(con, CSM_EV_CON_ERROR);
+       }
+       ibtrs_srv_stats_dec_inflight(sess);
+}
+
+static int ibtrs_send_usr_msg_ack(struct ibtrs_con *con)
+{
+       struct ibtrs_session *sess;
+       int err;
+
+       sess = con->sess;
+
+       if (unlikely(con->state != CSM_STATE_CONNECTED)) {
+               ERR_RL(sess, "Sending user msg ack failed, disconnected"
+                       " Connection state is %s\n", csm_state_str(con->state));
+               return -ECOMM;
+       }
+       DEB("Sending user message ack\n");
+       err = ibtrs_write_empty_imm(con->ib_con.qp, UINT_MAX - 1,
+                                   IB_SEND_SIGNALED);
+       if (unlikely(err)) {
+               ERR_RL(sess, "Sending user Ack msg failed, errno: %d\n", err);
+               return err;
+       }
+
+       ibtrs_heartbeat_set_send_ts(&sess->heartbeat);
+       return 0;
+}
+
+static void process_msg_user(struct ibtrs_con *con,
+                            struct ibtrs_msg_user *msg)
+{
+       int len;
+       struct ibtrs_session *sess = con->sess;
+
+       len = msg->hdr.tsize - IBTRS_HDR_LEN;
+       if (unlikely(sess->state < SSM_STATE_CONNECTED || !sess->priv)) {
+               ERR_RL(sess, "Sending user msg failed, session isn't ready."
+                       " Session state is %s\n", ssm_state_str(sess->state));
+               return;
+       }
+
+       srv_ops->recv(sess, sess->priv, msg->payl, len);
+
+       atomic64_inc(&sess->stats.user_ib_msgs.recv_msg_cnt);
+       atomic64_add(len, &sess->stats.user_ib_msgs.recv_size);
+}
+
+static void process_msg_user_ack(struct ibtrs_con *con)
+{
+       struct ibtrs_session *sess = con->sess;
+
+       atomic_inc(&sess->peer_usr_msg_bufs);
+       wake_up(&con->sess->mu_buf_wait_q);
+}
+
+static void ibtrs_handle_write(struct ibtrs_con *con, struct ibtrs_iu *iu,
+                              struct ibtrs_msg_hdr *hdr, u32 id, u32 off)
+{
+       struct ibtrs_session *sess = con->sess;
+       int ret;
+
+       if (unlikely(ibtrs_validate_message(sess->queue_depth, hdr))) {
+               ERR(sess,
+                   "Processing I/O failed, message validation failed\n");
+               ret = ibtrs_post_recv(con, iu);
+               if (unlikely(ret != 0))
+                       ERR(sess,
+                           "Failed to post receive buffer to HCA, errno: %d\n",
+                           ret);
+               goto err;
+       }
+
+       DEB("recv completion, type 0x%02x, tag %u, id %u, off %u\n",
+           hdr->type, iu->tag, id, off);
+       print_hex_dump_debug("", DUMP_PREFIX_OFFSET, 8, 1,
+                            hdr, IBTRS_HDR_LEN + 32, true);
+       ret = ibtrs_post_recv(con, iu);
+       if (unlikely(ret != 0)) {
+               ERR(sess, "Posting receive buffer to HCA failed, errno: %d\n",
+                   ret);
+               goto err;
+       }
+
+       switch (hdr->type) {
+       case IBTRS_MSG_RDMA_WRITE:
+               process_rdma_write(con, (struct ibtrs_msg_rdma_write *)hdr,
+                                  id, off);
+               break;
+       case IBTRS_MSG_REQ_RDMA_WRITE:
+               process_rdma_write_req(con,
+                                      (struct ibtrs_msg_req_rdma_write *)hdr,
+                                      id, off);
+               break;
+       default:
+               ERR(sess, "Processing I/O request failed, "
+                   "unknown message type received: 0x%02x\n", hdr->type);
+               goto err;
+       }
+
+       return;
+
+err:
+       csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void msg_worker(struct work_struct *work)
+{
+       struct msg_work *w;
+       struct ibtrs_con *con;
+       struct ibtrs_msg_user *msg;
+
+       w = container_of(work, struct msg_work, work);
+       con = w->con;
+       msg = w->msg;
+       kvfree(w);
+       process_msg_user(con, msg);
+       kvfree(msg);
+}
+
+static int ibtrs_schedule_msg(struct ibtrs_con *con, struct ibtrs_msg_user 
*msg)
+{
+       struct msg_work *w;
+
+       w = ibtrs_malloc(sizeof(*w));
+       if (!w)
+               return -ENOMEM;
+
+       w->con = con;
+       w->msg = ibtrs_malloc(msg->hdr.tsize);
+       if (!w->msg) {
+               kvfree(w);
+               return -ENOMEM;
+       }
+       memcpy(w->msg, msg, msg->hdr.tsize);
+       INIT_WORK(&w->work, msg_worker);
+       queue_work(con->sess->msg_wq, &w->work);
+       return 0;
+}
+
+static void ibtrs_handle_recv(struct ibtrs_con *con,  struct ibtrs_iu *iu)
+{
+       struct ibtrs_msg_hdr *hdr;
+       struct ibtrs_msg_sess_info *req;
+       struct ibtrs_session *sess = con->sess;
+       int ret;
+       u8 type;
+
+       hdr = (struct ibtrs_msg_hdr *)iu->buf;
+       if (unlikely(ibtrs_validate_message(sess->queue_depth, hdr)))
+               goto err1;
+
+       type = hdr->type;
+
+       DEB("recv completion, type 0x%02x, tag %u\n",
+           type, iu->tag);
+       print_hex_dump_debug("", DUMP_PREFIX_OFFSET, 8, 1,
+                            iu->buf, IBTRS_HDR_LEN, true);
+
+       switch (type) {
+       case IBTRS_MSG_USER:
+               ret = ibtrs_schedule_msg(con, iu->buf);
+               if (unlikely(ret)) {
+                       ERR_RL(sess, "Scheduling worker of user message "
+                              "to user module failed, errno: %d\n", ret);
+                       goto err1;
+               }
+               ret = ibtrs_post_recv(con, iu);
+               if (unlikely(ret)) {
+                       ERR_RL(sess, "Posting receive buffer of user message "
+                              "to HCA failed, errno: %d\n", ret);
+                       goto err2;
+               }
+               ret = ibtrs_send_usr_msg_ack(con);
+               if (unlikely(ret)) {
+                       ERR_RL(sess, "Sending ACK for user message failed, "
+                              "errno: %d\n", ret);
+                       goto err2;
+               }
+               return;
+       case IBTRS_MSG_SESS_INFO:
+               ret = ibtrs_post_recv(con, iu);
+               if (unlikely(ret)) {
+                       ERR_RL(sess, "Posting receive buffer of sess info "
+                              "to HCA failed, errno: %d\n", ret);
+                       goto err2;
+               }
+               req = (struct ibtrs_msg_sess_info *)hdr;
+               strlcpy(sess->hostname, req->hostname, sizeof(sess->hostname));
+               return;
+       default:
+               ERR(sess, "Processing received message failed, "
+                   "unknown type: 0x%02x\n", type);
+               goto err1;
+       }
+
+err1:
+       ibtrs_post_recv(con, iu);
+err2:
+       ERR(sess, "Failed to process IBTRS message\n");
+       csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void add_con_to_list(struct ibtrs_session *sess, struct ibtrs_con *con)
+{
+       mutex_lock(&sess->lock);
+       list_add_tail(&con->list, &sess->con_list);
+       mutex_unlock(&sess->lock);
+}
+
+static void remove_con_from_list(struct ibtrs_con *con)
+{
+       if (WARN_ON(!con->sess))
+               return;
+       mutex_lock(&con->sess->lock);
+       list_del(&con->list);
+       mutex_unlock(&con->sess->lock);
+}
+
+static void close_con(struct ibtrs_con *con)
+{
+       struct ibtrs_session *sess = con->sess;
+
+       DEB("Closing connection %p\n", con);
+
+       if (con->user)
+               cancel_delayed_work(&sess->send_heartbeat_dwork);
+
+       cancel_work_sync(&con->cq_work);
+       destroy_workqueue(con->rdma_resp_wq);
+
+       ib_con_destroy(&con->ib_con);
+       if (!con->user && !con->device_being_removed)
+               rdma_destroy_id(con->cm_id);
+
+       destroy_workqueue(con->cq_wq);
+
+       if (con->user) {
+               /* notify possible user msg ACK thread waiting for a tx iu or
+                * user msg buffer so they can check the connection state, give
+                * up waiting and put back any tx_iu reserved
+                */
+               wake_up(&sess->mu_buf_wait_q);
+               wake_up(&sess->mu_iu_wait_q);
+               destroy_workqueue(sess->msg_wq);
+       }
+
+       con->sess->active_cnt--;
+}
+
+static void destroy_con(struct ibtrs_con *con)
+{
+       remove_con_from_list(con);
+       kvfree(con);
+}
+
+static void destroy_sess(struct kref *kref)
+{
+       struct ibtrs_session *sess = container_of(kref, struct ibtrs_session,
+                                                 kref);
+       struct ibtrs_con *con, *con_next;
+
+       if (sess->cm_id)
+               rdma_destroy_id(sess->cm_id);
+
+       destroy_workqueue(sess->sm_wq);
+
+       list_for_each_entry_safe(con, con_next, &sess->con_list, list)
+               destroy_con(con);
+
+       mutex_lock(&sess_mutex);
+       list_del(&sess->list);
+       mutex_unlock(&sess_mutex);
+       wake_up(&sess_list_waitq);
+
+       INFO(sess, "Session is closed\n");
+       kvfree(sess);
+}
+
+int ibtrs_srv_sess_get(struct ibtrs_session *sess)
+{
+       return kref_get_unless_zero(&sess->kref);
+}
+
+void ibtrs_srv_sess_put(struct ibtrs_session *sess)
+{
+       kref_put(&sess->kref, destroy_sess);
+}
+
+static void sess_put_worker(struct work_struct *work)
+{
+       struct sess_put_work *w = container_of(work, struct sess_put_work,
+                                              work);
+
+       ibtrs_srv_sess_put(w->sess);
+       kvfree(w);
+}
+
+static void schedule_sess_put(struct ibtrs_session *sess)
+{
+       struct sess_put_work *w;
+
+       while (true) {
+               w = ibtrs_malloc(sizeof(*w));
+               if (w)
+                       break;
+               cond_resched();
+       }
+
+       /* Since we can be closing this session from a session workqueue,
+        * we need to schedule another work on the global workqueue to put the
+        * session, which can destroy the session workqueue and free the
+        * session.
+        */
+       w->sess = sess;
+       INIT_WORK(&w->work, sess_put_worker);
+       queue_work(destroy_wq, &w->work);
+}
+
+static void ibtrs_srv_sysfs_put_worker(struct work_struct *work)
+{
+       struct ibtrs_srv_sysfs_put_work *w;
+
+       w = container_of(work, struct ibtrs_srv_sysfs_put_work, work);
+       kobject_put(&w->sess->kobj_stats);
+       kobject_put(&w->sess->kobj);
+
+       kvfree(w);
+}
+
+static void ibtrs_srv_schedule_sysfs_put(struct ibtrs_session *sess)
+{
+       struct ibtrs_srv_sysfs_put_work *w = ibtrs_malloc(sizeof(*w));
+
+       if (WARN_ON(!w))
+               return;
+
+       w->sess = sess;
+
+       INIT_WORK(&w->work, ibtrs_srv_sysfs_put_worker);
+       queue_work(destroy_wq, &w->work);
+}
+
+static void ibtrs_free_dev(struct kref *ref)
+{
+       struct ibtrs_device *ndev =
+               container_of(ref, struct ibtrs_device, ref);
+
+       mutex_lock(&device_list_mutex);
+       list_del(&ndev->entry);
+       mutex_unlock(&device_list_mutex);
+       ib_session_destroy(&ndev->ib_sess);
+       if (ndev->ib_sess_destroy_completion)
+               complete_all(ndev->ib_sess_destroy_completion);
+       kfree(ndev);
+}
+
+static struct ibtrs_device *
+ibtrs_find_get_device(struct rdma_cm_id *cm_id)
+{
+       struct ibtrs_device *ndev;
+       int err;
+
+       mutex_lock(&device_list_mutex);
+       list_for_each_entry(ndev, &device_list, entry) {
+               if (ndev->device->node_guid == cm_id->device->node_guid &&
+                   kref_get_unless_zero(&ndev->ref))
+                       goto out_unlock;
+       }
+
+       ndev = kzalloc(sizeof(*ndev), GFP_KERNEL);
+       if (!ndev)
+               goto out_err;
+
+       ndev->device = cm_id->device;
+       kref_init(&ndev->ref);
+
+       err = ib_session_init(cm_id->device, &ndev->ib_sess);
+       if (err)
+               goto out_free;
+
+       list_add(&ndev->entry, &device_list);
+       DEB("added %s.\n", ndev->device->name);
+out_unlock:
+       mutex_unlock(&device_list_mutex);
+       return ndev;
+
+out_free:
+       kfree(ndev);
+out_err:
+       mutex_unlock(&device_list_mutex);
+       return NULL;
+}
+
+static void ibtrs_srv_destroy_ib_session(struct ibtrs_session *sess)
+{
+       release_cont_bufs(sess);
+       free_sess_bufs(sess);
+       kref_put(&sess->dev->ref, ibtrs_free_dev);
+}
+
+static void process_err_wc(struct ibtrs_con *con, struct ib_wc *wc)
+{
+       struct ibtrs_iu *iu;
+
+       if (wc->wr_id == (uintptr_t)&con->ib_con.beacon) {
+               DEB("beacon received for con %p\n", con);
+               csm_schedule_event(con, CSM_EV_BEACON_COMPLETED);
+               return;
+       }
+
+       /* only wc->wr_id is ensured to be correct in erroneous WCs,
+        * we can't rely on wc->opcode, use iu->direction to determine if it's
+        * an tx or rx IU
+        */
+       iu = (struct ibtrs_iu *)wc->wr_id;
+       if (iu && iu->direction == DMA_TO_DEVICE &&
+           iu != con->sess->rdma_info_iu)
+               put_tx_iu(con->sess, iu);
+
+       if (wc->status != IB_WC_WR_FLUSH_ERR ||
+           (con->state != CSM_STATE_CLOSING &&
+            con->state != CSM_STATE_FLUSHING)) {
+               /* suppress flush errors when the connection has
+                * just called rdma_disconnect() and is in
+                * DISCONNECTING state waiting for the second
+                * CM_DISCONNECTED event
+                */
+               ERR_RL(con->sess, "%s (wr_id: 0x%llx,"
+                      " type: %s, vendor_err: 0x%x, len: %u)\n",
+                      ib_wc_status_msg(wc->status), wc->wr_id,
+                      ib_wc_opcode_str(wc->opcode),
+                      wc->vendor_err, wc->byte_len);
+       }
+       csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static int process_wcs(struct ibtrs_con *con, struct ib_wc *wcs, size_t len)
+{
+       int i, ret;
+       struct ibtrs_iu *iu;
+       struct ibtrs_session *sess = con->sess;
+
+       for (i = 0; i < len; i++) {
+               struct ib_wc wc = wcs[i];
+
+               if (unlikely(wc.status != IB_WC_SUCCESS)) {
+                       process_err_wc(con, &wc);
+                       continue;
+               }
+
+               /* DEB("cq complete with wr_id 0x%llx, len %u "
+                *  "status %d (%s) type %d (%s)\n", wc.wr_id,
+                *  wc.byte_len, wc.status, ib_wc_status_msg(wc.status),
+                *  wc.opcode, ib_wc_opcode_str(wc.opcode));
+                */
+
+               switch (wc.opcode) {
+               case IB_WC_SEND:
+                       iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+                       if (iu == con->sess->rdma_info_iu)
+                               break;
+                       put_tx_iu(sess, iu);
+                       if (con->user)
+                               wake_up(&sess->mu_iu_wait_q);
+                       break;
+
+               case IB_WC_RECV_RDMA_WITH_IMM: {
+                       u32 imm, id, off;
+                       struct ibtrs_msg_hdr *hdr;
+
+                       ibtrs_set_last_heartbeat(&sess->heartbeat);
+
+                       iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+                       imm = be32_to_cpu(wc.ex.imm_data);
+                       if (imm == UINT_MAX) {
+                               ret = ibtrs_post_recv(con, iu);
+                               if (unlikely(ret != 0)) {
+                                       ERR(sess, "post receive buffer failed,"
+                                           " errno: %d\n", ret);
+                                       return ret;
+                               }
+                               break;
+                       } else if (imm == UINT_MAX - 1) {
+                               ret = ibtrs_post_recv(con, iu);
+                               if (unlikely(ret))
+                                       ERR_RL(sess, "Posting receive buffer of"
+                                              " user Ack msg to HCA failed,"
+                                              " errno: %d\n", ret);
+                               process_msg_user_ack(con);
+                               break;
+                       }
+                       id = imm >> sess->off_len;
+                       off = imm & sess->off_mask;
+
+                       if (id > sess->queue_depth || off > rcv_buf_size) {
+                               ERR(sess, "Processing I/O failed, contiguous "
+                                   "buf addr is out of reserved area\n");
+                               ret = ibtrs_post_recv(con, iu);
+                               if (unlikely(ret != 0))
+                                       ERR(sess, "Processing I/O failed, "
+                                           "post receive buffer failed, "
+                                           "errno: %d\n", ret);
+                               return -EIO;
+                       }
+
+                       hdr = (struct ibtrs_msg_hdr *)
+                               (sess->rcv_buf_pool->rcv_bufs[id].buf + off);
+
+                       ibtrs_handle_write(con, iu, hdr, id, off);
+                       break;
+               }
+
+               case IB_WC_RDMA_WRITE:
+                       break;
+
+               case IB_WC_RECV: {
+                       struct ibtrs_msg_hdr *hdr;
+
+                       ibtrs_set_last_heartbeat(&sess->heartbeat);
+                       iu = (struct ibtrs_iu *)(uintptr_t)wc.wr_id;
+                       hdr = (struct ibtrs_msg_hdr *)iu->buf;
+                       ibtrs_deb_msg_hdr("Received: ", hdr);
+                       ibtrs_handle_recv(con, iu);
+                       break;
+               }
+
+               default:
+                       ERR(sess, "Processing work completion failed,"
+                           " WC has unknown opcode: %s\n",
+                           ib_wc_opcode_str(wc.opcode));
+                       return -EINVAL;
+               }
+       }
+       return 0;
+}
+
+static void ibtrs_srv_update_wc_stats(struct ibtrs_con *con, int cnt)
+{
+       int old_max = atomic_read(&con->sess->stats.wc_comp.max_wc_cnt);
+       int act_max;
+
+       while (cnt > old_max) {
+               act_max = atomic_cmpxchg(&con->sess->stats.wc_comp.max_wc_cnt,
+                                        old_max, cnt);
+               if (likely(act_max == old_max))
+                       break;
+               old_max = act_max;
+       }
+
+       atomic64_inc(&con->sess->stats.wc_comp.calls);
+       atomic64_add(cnt, &con->sess->stats.wc_comp.total_wc_cnt);
+}
+
+static int get_process_wcs(struct ibtrs_con *con, int *total_cnt)
+{
+       int cnt, err;
+
+       do {
+               cnt = ib_poll_cq(con->ib_con.cq, ARRAY_SIZE(con->wcs),
+                                con->wcs);
+               if (unlikely(cnt < 0)) {
+                       ERR(con->sess, "Polling completion queue failed, "
+                           "errno: %d\n", cnt);
+                       return cnt;
+               }
+
+               if (likely(cnt > 0)) {
+                       err = process_wcs(con, con->wcs, cnt);
+                       *total_cnt += cnt;
+                       if (unlikely(err))
+                               return err;
+               }
+       } while (cnt > 0);
+
+       return 0;
+}
+
+static void wrapper_handle_cq_comp(struct work_struct *work)
+{
+       int err;
+       struct ibtrs_con *con = container_of(work, struct ibtrs_con, cq_work);
+       struct ibtrs_session *sess = con->sess;
+       int total_cnt = 0;
+
+       if (unlikely(con->state == CSM_STATE_CLOSED)) {
+               ERR(sess, "Retrieving work completions from completion"
+                   " queue failed, connection is disconnected\n");
+               goto error;
+       }
+
+       err = get_process_wcs(con, &total_cnt);
+       if (unlikely(err))
+               goto error;
+
+       while ((err = ib_req_notify_cq(con->ib_con.cq, IB_CQ_NEXT_COMP |
+                                      IB_CQ_REPORT_MISSED_EVENTS)) > 0) {
+               DEB("Missed %d CQ notifications, processing missed WCs...\n",
+                   err);
+               err = get_process_wcs(con, &total_cnt);
+               if (unlikely(err))
+                       goto error;
+       }
+
+       if (unlikely(err))
+               goto error;
+
+       ibtrs_srv_update_wc_stats(con, total_cnt);
+       return;
+
+error:
+       csm_schedule_event(con, CSM_EV_CON_ERROR);
+}
+
+static void cq_event_handler(struct ib_cq *cq, void *ctx)
+{
+       struct ibtrs_con *con = ctx;
+
+       /* queue_work() can return False here.
+        * The work can be already queued, When CQ notifications were already
+        * activiated and are activated again after the beacon was posted.
+        */
+       if (con->state != CSM_STATE_CLOSED)
+               queue_work(con->cq_wq, &con->cq_work);
+}
+
+static int accept(struct ibtrs_con *con)
+{
+       struct rdma_conn_param conn_param;
+       int ret;
+       struct ibtrs_session *sess = con->sess;
+
+       memset(&conn_param, 0, sizeof(conn_param));
+       conn_param.retry_count = retry_count;
+
+       if (con->user)
+               conn_param.rnr_retry_count = 7;
+
+       ret = rdma_accept(con->cm_id, &conn_param);
+       if (ret) {
+               ERR(sess, "Accepting RDMA connection request failed,"
+                   " errno: %d\n", ret);
+               return ret;
+       }
+
+       return 0;
+}
+
+static struct ibtrs_session *
+__create_sess(struct rdma_cm_id *cm_id, const struct ibtrs_msg_sess_open *req)
+{
+       struct ibtrs_session *sess;
+       int err;
+
+       sess = ibtrs_zalloc(sizeof(*sess));
+       if (!sess) {
+               err = -ENOMEM;
+               goto out;
+       }
+
+       err = ibtrs_addr_to_str(&cm_id->route.addr.dst_addr, sess->addr,
+                               sizeof(sess->addr));
+       if (err < 0)
+               goto err1;
+
+       sess->est_cnt = 0;
+       sess->state_in_sysfs = false;
+       sess->cur_cq_vector = -1;
+       INIT_LIST_HEAD(&sess->con_list);
+       mutex_init(&sess->lock);
+
+       INIT_LIST_HEAD(&sess->tx_bufs);
+       spin_lock_init(&sess->tx_bufs_lock);
+
+       err = ib_get_max_wr_queue_size(cm_id->device);
+       if (err < 0)
+               goto err1;
+
+       sess->wq_size = err - 1;
+
+       sess->queue_depth               = sess_queue_depth;
+       sess->con_cnt                   = req->con_cnt;
+       sess->ver                       = min_t(u8, req->ver, IBTRS_VERSION);
+       sess->primary_port_num          = cm_id->port_num;
+
+       init_waitqueue_head(&sess->mu_iu_wait_q);
+       init_waitqueue_head(&sess->mu_buf_wait_q);
+       ibtrs_set_heartbeat_timeout(&sess->heartbeat,
+                                   default_heartbeat_timeout_ms <
+                                   MIN_HEARTBEAT_TIMEOUT_MS ?
+                                   MIN_HEARTBEAT_TIMEOUT_MS :
+                                   default_heartbeat_timeout_ms);
+       atomic64_set(&sess->heartbeat.send_ts_ms, 0);
+       atomic64_set(&sess->heartbeat.recv_ts_ms, 0);
+       sess->heartbeat.addr = sess->addr;
+       sess->heartbeat.hostname = sess->hostname;
+
+       atomic_set(&sess->peer_usr_msg_bufs, USR_MSG_CNT);
+       sess->dev = ibtrs_find_get_device(cm_id);
+       if (!sess->dev) {
+               err = -ENOMEM;
+               WRN(sess, "Failed to alloc ibtrs_device\n");
+               goto err1;
+       }
+       err = setup_cont_bufs(sess);
+       if (err)
+               goto err2;
+
+       memcpy(sess->uuid, req->uuid, IBTRS_UUID_SIZE);
+       err = ssm_init(sess);
+       if (err) {
+               WRN(sess, "Failed to initialize the session state machine\n");
+               goto err3;
+       }
+
+       kref_init(&sess->kref);
+       init_waitqueue_head(&sess->bufs_wait);
+
+       list_add(&sess->list, &sess_list);
+       INFO(sess, "IBTRS Session created (queue depth: %d)\n",
+            sess->queue_depth);
+
+       return sess;
+
+err3:
+       release_cont_bufs(sess);
+err2:
+       kref_put(&sess->dev->ref, ibtrs_free_dev);
+err1:
+       kvfree(sess);
+out:
+       return ERR_PTR(err);
+}
+
+inline const char *ibtrs_srv_get_sess_hostname(struct ibtrs_session *sess)
+{
+       return sess->hostname;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_hostname);
+
+inline const char *ibtrs_srv_get_sess_addr(struct ibtrs_session *sess)
+{
+       return sess->addr;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_addr);
+
+inline int ibtrs_srv_get_sess_qdepth(struct ibtrs_session *sess)
+{
+       return sess->queue_depth;
+}
+EXPORT_SYMBOL(ibtrs_srv_get_sess_qdepth);
+
+static struct ibtrs_session *__find_active_sess(const char *uuid)
+{
+       struct ibtrs_session *n;
+
+       list_for_each_entry(n, &sess_list, list) {
+               if (!memcmp(n->uuid, uuid, sizeof(n->uuid)) &&
+                   n->state != SSM_STATE_CLOSING &&
+                   n->state != SSM_STATE_CLOSED)
+                       return n;
+       }
+
+       return NULL;
+}
+
+static int rdma_con_reject(struct rdma_cm_id *cm_id, s16 errno)
+{
+       struct ibtrs_msg_error msg;
+       int ret;
+
+       memset(&msg, 0, sizeof(msg));
+       msg.hdr.type    = IBTRS_MSG_ERROR;
+       msg.hdr.tsize   = sizeof(msg);
+       msg.errno       = errno;
+
+       ret = rdma_reject(cm_id, &msg, sizeof(msg));
+       if (ret)
+               ERR_NP("Rejecting RDMA connection request failed, errno: %d\n",
+                      ret);
+
+       return ret;
+}
+
+static int find_next_bit_ring(int cur)
+{
+       int v = cpumask_next(cur, &cq_affinity_mask);
+
+       if (v >= nr_cpu_ids)
+               v = cpumask_first(&cq_affinity_mask);
+       return v;
+}
+
+static int ibtrs_srv_get_next_cq_vector(struct ibtrs_session *sess)
+{
+       sess->cur_cq_vector = find_next_bit_ring(sess->cur_cq_vector);
+
+       return sess->cur_cq_vector;
+}
+
+static void ssm_create_con_worker(struct work_struct *work)
+{
+       struct ssm_create_con_work *ssm_w =
+                       container_of(work, struct ssm_create_con_work, work);
+       struct ibtrs_session *sess = ssm_w->sess;
+       struct rdma_cm_id *cm_id = ssm_w->cm_id;
+       bool user = ssm_w->user;
+       struct ibtrs_con *con;
+       int ret;
+       u16 cq_size, wr_queue_size;
+
+       kvfree(ssm_w);
+
+       if (sess->state == SSM_STATE_CLOSING ||
+           sess->state == SSM_STATE_CLOSED) {
+               WRN(sess, "Creating connection failed, "
+                   "session is being closed\n");
+               ret = -ECOMM;
+               goto err_reject;
+       }
+
+       con = ibtrs_zalloc(sizeof(*con));
+       if (!con) {
+               ERR(sess, "Creating connection failed, "
+                   "can't allocate memory for connection\n");
+               ret = -ENOMEM;
+               goto err_reject;
+       }
+
+       con->cm_id                      = cm_id;
+       con->sess                       = sess;
+       con->user                       = user;
+       con->device_being_removed       = false;
+
+       atomic_set(&con->wr_cnt, 0);
+       if (con->user) {
+               cq_size         = USR_CON_BUF_SIZE + 1;
+               wr_queue_size   = USR_CON_BUF_SIZE + 1;
+       } else {
+               cq_size         = con->sess->queue_depth;
+               wr_queue_size   = sess->wq_size;
+       }
+
+       con->cq_vector = ibtrs_srv_get_next_cq_vector(sess);
+
+       con->ib_con.addr = sess->addr;
+       con->ib_con.hostname = sess->hostname;
+       ret = ib_con_init(&con->ib_con, con->cm_id,
+                         1, cq_event_handler, con, con->cq_vector, cq_size,
+                         wr_queue_size, &con->sess->dev->ib_sess);
+       if (ret)
+               goto err_init;
+
+       INIT_WORK(&con->cq_work, wrapper_handle_cq_comp);
+       if (con->user)
+               con->cq_wq = alloc_ordered_workqueue("%s",
+                                                    WQ_HIGHPRI,
+                                                    "ibtrs_srv_wq");
+       else
+               con->cq_wq = alloc_workqueue("%s",
+                                            WQ_CPU_INTENSIVE | WQ_HIGHPRI, 0,
+                                            "ibtrs_srv_wq");
+       if (!con->cq_wq) {
+               ERR(sess, "Creating connection failed, can't allocate "
+                   "work queue for completion queue, errno: %d\n", ret);
+               goto err_wq1;
+       }
+
+       con->rdma_resp_wq = alloc_workqueue("%s", 0, WQ_HIGHPRI,
+                                           "ibtrs_rdma_resp");
+
+       if (!con->rdma_resp_wq) {
+               ERR(sess, "Creating connection failed, can't allocate"
+                   " work queue for send response, errno: %d\n", ret);
+               goto err_wq2;
+       }
+
+       ret = init_transfer_bufs(con);
+       if (ret) {
+               ERR(sess, "Creating connection failed, can't init"
+                   " transfer buffers, errno: %d\n", ret);
+               goto err_buf;
+       }
+
+       csm_init(con);
+       add_con_to_list(sess, con);
+
+       cm_id->context = con;
+       if (con->user) {
+               con->sess->msg_wq = alloc_ordered_workqueue("sess_msg_wq", 0);
+               if (!con->sess->msg_wq) {
+                       ERR(con->sess, "Failed to create user message"
+                           " workqueue\n");
+                       ret = -ENOMEM;
+                       goto err_accept;
+               }
+       }
+
+       DEB("accept request\n");
+       ret = accept(con);
+       if (ret)
+               goto err_msg;
+
+       if (con->user)
+               con->sess->cm_id = cm_id;
+
+       con->sess->active_cnt++;
+
+       return;
+err_msg:
+       if (con->user)
+               destroy_workqueue(con->sess->msg_wq);
+err_accept:
+       cm_id->context = NULL;
+       remove_con_from_list(con);
+err_buf:
+       destroy_workqueue(con->rdma_resp_wq);
+err_wq2:
+       destroy_workqueue(con->cq_wq);
+err_wq1:
+       ib_con_destroy(&con->ib_con);
+err_init:
+       kvfree(con);
+err_reject:
+       rdma_destroy_id(cm_id);
+
+       ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+}
+
+static int ssm_schedule_create_con(struct ibtrs_session *sess,
+                                  struct rdma_cm_id *cm_id,
+                                  bool user)
+{
+       struct ssm_create_con_work *w;
+
+       w = ibtrs_malloc(sizeof(*w));
+       if (!w)
+               return -ENOMEM;
+
+       w->sess         = sess;
+       w->cm_id        = cm_id;
+       w->user         = user;
+       INIT_WORK(&w->work, ssm_create_con_worker);
+       queue_work(sess->sm_wq, &w->work);
+
+       return 0;
+}
+
+static int rdma_con_establish(struct rdma_cm_id *cm_id, const void *data,
+                             size_t size)
+{
+       struct ibtrs_session *sess;
+       int ret;
+       const char *uuid = NULL;
+       const struct ibtrs_msg_hdr *hdr = data;
+       bool user = false;
+
+       if (unlikely(!srv_ops_are_valid(srv_ops))) {
+               ERR_NP("Establishing connection failed, "
+                      "no user module registered!\n");
+               ret = -ECOMM;
+               goto err_reject;
+       }
+
+       if (unlikely((size < sizeof(struct ibtrs_msg_con_open)) ||
+                    (size < sizeof(struct ibtrs_msg_sess_open)) ||
+                    ibtrs_validate_message(0, hdr))) {
+               ERR_NP("Establishing connection failed, "
+                      "connection request payload size unexpected "
+                      "%zu != %lu or %lu\n", size,
+                      sizeof(struct ibtrs_msg_con_open),
+                      sizeof(struct ibtrs_msg_sess_open));
+               ret = -EINVAL;
+               goto err_reject;
+       }
+
+       if (hdr->type == IBTRS_MSG_SESS_OPEN)
+               uuid = ((struct ibtrs_msg_sess_open *)data)->uuid;
+       else if (hdr->type == IBTRS_MSG_CON_OPEN)
+               uuid = ((struct ibtrs_msg_con_open *)data)->uuid;
+
+       mutex_lock(&sess_mutex);
+       sess = __find_active_sess(uuid);
+       if (sess) {
+               if (unlikely(hdr->type == IBTRS_MSG_SESS_OPEN)) {
+                       INFO(sess, "Connection request rejected, "
+                            "session already exists\n");
+                       mutex_unlock(&sess_mutex);
+                       ret = -EEXIST;
+                       goto err_reject;
+               }
+               if (!ibtrs_srv_sess_get(sess)) {
+                       INFO(sess, "Connection request rejected,"
+                            " session is being closed\n");
+                       mutex_unlock(&sess_mutex);
+                       ret = -EINVAL;
+                       goto err_reject;
+               }
+       } else {
+               if (unlikely(hdr->type == IBTRS_MSG_CON_OPEN)) {
+                       mutex_unlock(&sess_mutex);
+                       INFO_NP("Connection request rejected,"
+                               " received con_open msg but no active session"
+                               " exists.\n");
+                       ret = -EINVAL;
+                       goto err_reject;
+               }
+
+               sess = __create_sess(cm_id, (struct ibtrs_msg_sess_open *)data);
+               if (IS_ERR(sess)) {
+                       mutex_unlock(&sess_mutex);
+                       ret = PTR_ERR(sess);
+                       ERR_NP("Establishing connection failed, "
+                              "creating local session resource failed, errno:"
+                              " %d\n", ret);
+                       goto err_reject;
+               }
+               ibtrs_srv_sess_get(sess);
+               user = true;
+       }
+
+       mutex_unlock(&sess_mutex);
+
+       ret = ssm_schedule_create_con(sess, cm_id, user);
+       if (ret) {
+               ERR(sess, "Unable to schedule creation of connection,"
+                   " session will be closed.\n");
+               goto err_close;
+       }
+
+       ibtrs_srv_sess_put(sess);
+       return 0;
+
+err_close:
+       ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+       ibtrs_srv_sess_put(sess);
+err_reject:
+       rdma_con_reject(cm_id, ret);
+       return ret;
+}
+
+static int ibtrs_srv_rdma_cm_ev_handler(struct rdma_cm_id *cm_id,
+                                       struct rdma_cm_event *event)
+{
+       struct ibtrs_con *con = cm_id->context;
+       int ret = 0;
+
+       DEB("cma_event type %d cma_id %p(%s) on con: %p\n", event->event,
+           cm_id, rdma_event_msg(event->event), con);
+       if (!con && event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
+               INFO_NP("Ignore cma_event type %d cma_id %p(%s)\n",
+                       event->event, cm_id, rdma_event_msg(event->event));
+               return 0;
+       }
+
+       switch (event->event) {
+       case RDMA_CM_EVENT_CONNECT_REQUEST:
+               ret = rdma_con_establish(cm_id, event->param.conn.private_data,
+                                        event->param.conn.private_data_len);
+               break;
+       case RDMA_CM_EVENT_ESTABLISHED:
+               csm_schedule_event(con, CSM_EV_CON_ESTABLISHED);
+               break;
+       case RDMA_CM_EVENT_DISCONNECTED:
+       case RDMA_CM_EVENT_TIMEWAIT_EXIT:
+               csm_schedule_event(con, CSM_EV_CON_DISCONNECTED);
+               break;
+
+       case RDMA_CM_EVENT_DEVICE_REMOVAL: {
+               struct completion dc;
+
+               ERR_RL(con->sess,
+                      "IB Device was removed, disconnecting session.\n");
+
+               con->device_being_removed = true;
+               init_completion(&dc);
+               con->sess->dev->ib_sess_destroy_completion = &dc;
+
+               csm_schedule_event(con, CSM_EV_DEVICE_REMOVAL);
+               wait_for_completion(&dc);
+
+               /* If it's user connection, the cm_id will be destroyed by
+                * destroy_sess(), so return 0 to signal that we will destroy
+                * it later. Otherwise, return 1 so CMA will destroy it.
+                */
+               if (con->user)
+                       return 0;
+               else
+                       return 1;
+       }
+       case RDMA_CM_EVENT_CONNECT_ERROR:
+       case RDMA_CM_EVENT_ROUTE_ERROR:
+       case RDMA_CM_EVENT_UNREACHABLE:
+       case RDMA_CM_EVENT_ADDR_CHANGE:
+               ERR_RL(con->sess, "CM error (CM event: %s, errno: %d)\n",
+                      rdma_event_msg(event->event), event->status);
+
+               csm_schedule_event(con, CSM_EV_CON_ERROR);
+               break;
+       case RDMA_CM_EVENT_REJECTED:
+               /* reject status is defined in enum, not errno */
+               ERR_RL(con->sess,
+                      "Connection rejected (CM event: %s, err: %s)\n",
+                      rdma_event_msg(event->event),
+                      rdma_reject_msg(cm_id, event->status));
+               csm_schedule_event(con, CSM_EV_CON_ERROR);
+               break;
+       default:
+               WRN(con->sess, "Ignoring unexpected CM event %s, errno %d\n",
+                   rdma_event_msg(event->event), event->status);
+               break;
+       }
+       return ret;
+}
+
+static int ibtrs_srv_cm_init(struct rdma_cm_id **cm_id, struct sockaddr *addr,
+                            enum rdma_port_space ps)
+{
+       int ret;
+
+       *cm_id = rdma_create_id(&init_net, ibtrs_srv_rdma_cm_ev_handler, NULL,
+                               ps, IB_QPT_RC);
+       if (IS_ERR(*cm_id)) {
+               ret = PTR_ERR(*cm_id);
+               ERR_NP("Creating id for RDMA connection failed, errno: %d\n",
+                      ret);
+               goto err_out;
+       }
+       DEB("created cm_id %p\n", *cm_id);
+       ret = rdma_bind_addr(*cm_id, addr);
+       if (ret) {
+               ERR_NP("Binding RDMA address failed, errno: %d\n", ret);
+               goto err_cm;
+       }
+       DEB("rdma_bind_addr successful\n");
+       /* we currently accept 64 rdma_connects */
+       ret = rdma_listen(*cm_id, 64);
+       if (ret) {
+               ERR_NP("Listening on RDMA connection failed, errno: %d\n", ret);
+               goto err_cm;
+       }
+
+       switch (addr->sa_family) {
+       case AF_INET:
+               DEB("listening on port %u\n",
+                   ntohs(((struct sockaddr_in *)addr)->sin_port));
+               break;
+       case AF_INET6:
+               DEB("listening on port %u\n",
+                   ntohs(((struct sockaddr_in6 *)addr)->sin6_port));
+               break;
+       case AF_IB:
+               DEB("listening on service id 0x%016llx\n",
+                   be64_to_cpu(rdma_get_service_id(*cm_id, addr)));
+               break;
+       default:
+               DEB("listening on address family %u\n", addr->sa_family);
+       }
+
+       return 0;
+
+err_cm:
+       rdma_destroy_id(*cm_id);
+err_out:
+       return ret;
+}
+
+static int ibtrs_srv_rdma_init(void)
+{
+       int ret = 0;
+       struct sockaddr_in6 sin = {
+               .sin6_family    = AF_INET6,
+               .sin6_addr      = IN6ADDR_ANY_INIT,
+               .sin6_port      = htons(IBTRS_SERVER_PORT),
+       };
+       struct sockaddr_ib sib = {
+               .sib_family                     = AF_IB,
+               .sib_addr.sib_subnet_prefix     = 0ULL,
+               .sib_addr.sib_interface_id      = 0ULL,
+               .sib_sid        = cpu_to_be64(RDMA_IB_IP_PS_IB |
+                                             IBTRS_SERVER_PORT),
+               .sib_sid_mask   = cpu_to_be64(0xffffffffffffffffULL),
+               .sib_pkey       = cpu_to_be16(0xffff),
+       };
+
+       /*
+        * We accept both IPoIB and IB connections, so we need to keep
+        * two cm id's, one for each socket type and port space.
+        * If the cm initialization of one of the id's fails, we abort
+        * everything.
+        */
+
+       ret = ibtrs_srv_cm_init(&cm_id_ip, (struct sockaddr *)&sin,
+                               RDMA_PS_TCP);
+       if (ret)
+               return ret;
+
+       ret = ibtrs_srv_cm_init(&cm_id_ib, (struct sockaddr *)&sib, RDMA_PS_IB);
+       if (ret)
+               goto err_cm_ib;
+
+       return ret;
+
+err_cm_ib:
+       rdma_destroy_id(cm_id_ip);
+       return ret;
+}
+
+static void ibtrs_srv_destroy_buf_pool(void)
+{
+       struct ibtrs_rcv_buf_pool *pool, *pool_next;
+
+       mutex_lock(&buf_pool_mutex);
+       list_for_each_entry_safe(pool, pool_next, &free_buf_pool_list, list) {
+               list_del(&pool->list);
+               nr_free_buf_pool--;
+               free_recv_buf_pool(pool);
+       }
+       mutex_unlock(&buf_pool_mutex);
+}
+
+static void ibtrs_srv_alloc_ini_buf_pool(void)
+{
+       struct ibtrs_rcv_buf_pool *pool;
+       int i;
+
+       if (init_pool_size == 0)
+               return;
+
+       INFO_NP("Trying to allocate RDMA buffers pool for %d client(s)\n",
+               init_pool_size);
+       for (i = 0; i < init_pool_size; i++) {
+               pool = alloc_rcv_buf_pool();
+               if (!pool) {
+                       ERR_NP("Failed to allocate initial RDMA buffer pool"
+                              " #%d\n", i + 1);
+                       break;
+               }
+               mutex_lock(&buf_pool_mutex);
+               list_add(&pool->list, &free_buf_pool_list);
+               nr_free_buf_pool++;
+               nr_total_buf_pool++;
+               mutex_unlock(&buf_pool_mutex);
+               DEB("Allocated buffer pool #%d\n", i);
+       }
+
+       INFO_NP("Allocated RDMA buffers pool for %d client(s)\n", i);
+}
+
+int ibtrs_srv_register(const struct ibtrs_srv_ops *ops)
+{
+       int err;
+
+       if (srv_ops) {
+               ERR_NP("Registration failed, module %s already registered,"
+                      " only 1 user module supported\n",
+               srv_ops->owner->name);
+               return -ENOTSUPP;
+       }
+
+       if (unlikely(!srv_ops_are_valid(ops))) {
+               ERR_NP("Registration failed, user module supploed invalid ops"
+                      " parameter\n");
+               return -EFAULT;
+       }
+
+       ibtrs_srv_alloc_ini_buf_pool();
+
+       err = ibtrs_srv_rdma_init();
+       if (err) {
+               ERR_NP("Can't init RDMA resource, errno: %d\n", err);
+               return err;
+       }
+       srv_ops = ops;
+
+       return 0;
+}
+EXPORT_SYMBOL(ibtrs_srv_register);
+
+inline void ibtrs_srv_queue_close(struct ibtrs_session *sess)
+{
+       ssm_schedule_event(sess, SSM_EV_SYSFS_DISCONNECT);
+}
+
+static void close_sessions(void)
+{
+       struct ibtrs_session *sess;
+
+       mutex_lock(&sess_mutex);
+       list_for_each_entry(sess, &sess_list, list) {
+               if (!ibtrs_srv_sess_get(sess))
+                       continue;
+               ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+               ibtrs_srv_sess_put(sess);
+       }
+       mutex_unlock(&sess_mutex);
+
+       wait_event(sess_list_waitq, list_empty(&sess_list));
+}
+
+void ibtrs_srv_unregister(const struct ibtrs_srv_ops *ops)
+{
+       if (!srv_ops) {
+               WRN_NP("Nothing to unregister - srv_ops = NULL\n");
+               return;
+       }
+
+       /* TODO: in order to support registration of multiple modules,
+        * introduce a list with srv_ops and search for the correct
+        * one.
+        */
+
+       if (srv_ops != ops) {
+               ERR_NP("Ops is not the ops we have registered\n");
+               return;
+       }
+
+       rdma_destroy_id(cm_id_ip);
+       cm_id_ip = NULL;
+       rdma_destroy_id(cm_id_ib);
+       cm_id_ib = NULL;
+       close_sessions();
+       flush_workqueue(destroy_wq);
+       ibtrs_srv_destroy_buf_pool();
+       srv_ops = NULL;
+}
+EXPORT_SYMBOL(ibtrs_srv_unregister);
+
+static int check_module_params(void)
+{
+       if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
+               ERR_NP("Invalid sess_queue_depth parameter value\n");
+               return -EINVAL;
+       }
+
+       /* check if IB immediate data size is enough to hold the mem_id and the
+        * offset inside the memory chunk
+        */
+       if (ilog2(sess_queue_depth - 1) + ilog2(rcv_buf_size - 1) >
+           IB_IMM_SIZE_BITS) {
+               ERR_NP("RDMA immediate size (%db) not enough to encode "
+                      "%d buffers of size %dB. Reduce 'sess_queue_depth' "
+                      "or 'max_io_size' parameters.\n", IB_IMM_SIZE_BITS,
+                      sess_queue_depth, rcv_buf_size);
+               return -EINVAL;
+       }
+
+       if (init_pool_size < 0) {
+               ERR_NP("Invalid 'init_pool_size' parameter value."
+                      " Value must be positive.\n");
+               return -EINVAL;
+       }
+
+       if (pool_size_hi_wm < init_pool_size) {
+               ERR_NP("Invalid 'pool_size_hi_wm' parameter value. Value must"
+                      " be iqual or higher than 'init_pool_size'.\n");
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+static void csm_init(struct ibtrs_con *con)
+{
+       DEB("initializing csm to %s\n", csm_state_str(CSM_STATE_REQUESTED));
+       csm_set_state(con, CSM_STATE_REQUESTED);
+}
+
+static int send_msg_sess_open_resp(struct ibtrs_con *con)
+{
+       struct ibtrs_msg_sess_open_resp *msg;
+       int err;
+       struct ibtrs_session *sess = con->sess;
+
+       msg = sess->rdma_info_iu->buf;
+
+       fill_ibtrs_msg_sess_open_resp(msg, con);
+
+       err = ibtrs_post_send(con->ib_con.qp, con->sess->dev->ib_sess.mr,
+                             sess->rdma_info_iu, msg->hdr.tsize);
+       if (unlikely(err))
+               ERR(sess, "Sending sess open resp failed, "
+                         "posting msg to QP failed, errno: %d\n", err);
+
+       return err;
+}
+
+static void queue_heartbeat_dwork(struct ibtrs_session *sess)
+{
+       ibtrs_set_last_heartbeat(&sess->heartbeat);
+       WARN_ON(!queue_delayed_work(sess->sm_wq,
+                                   &sess->send_heartbeat_dwork,
+                                   HEARTBEAT_INTV_JIFFIES));
+       WARN_ON(!queue_delayed_work(sess->sm_wq,
+                                   &sess->check_heartbeat_dwork,
+                                   HEARTBEAT_INTV_JIFFIES));
+}
+
+static void csm_requested(struct ibtrs_con *con, enum csm_ev ev)
+{
+       struct ibtrs_session *sess = con->sess;
+       enum csm_state state = con->state;
+
+       DEB("con %p, event %s\n", con, csm_ev_str(ev));
+       switch (ev) {
+       case CSM_EV_CON_ESTABLISHED: {
+               csm_set_state(con, CSM_STATE_CONNECTED);
+               if (con->user) {
+                       /* send back rdma info */
+                       if (send_msg_sess_open_resp(con))
+                               goto destroy;
+                       queue_heartbeat_dwork(con->sess);
+               }
+               ssm_schedule_event(sess, SSM_EV_CON_CONNECTED);
+               break;
+       }
+       case CSM_EV_DEVICE_REMOVAL:
+       case CSM_EV_CON_ERROR:
+       case CSM_EV_SESS_CLOSING:
+       case CSM_EV_CON_DISCONNECTED:
+destroy:
+               csm_set_state(con, CSM_STATE_CLOSED);
+               close_con(con);
+               ssm_schedule_event(sess, SSM_EV_CON_EST_ERR);
+               break;
+       default:
+               ERR(sess, "Connection received unexpected event %s "
+                   "in %s state.\n", csm_ev_str(ev), csm_state_str(state));
+       }
+}
+
+static void csm_connected(struct ibtrs_con *con, enum csm_ev ev)
+{
+       struct ibtrs_session *sess = con->sess;
+       enum csm_state state = con->state;
+
+       DEB("con %p, event %s\n", con, csm_ev_str(ev));
+       switch (ev) {
+       case CSM_EV_CON_ERROR:
+       case CSM_EV_SESS_CLOSING: {
+               int err;
+
+               csm_set_state(con, CSM_STATE_CLOSING);
+               err = rdma_disconnect(con->cm_id);
+               if (err)
+                       ERR(sess, "Connection received event %s "
+                           "in %s state, new state is %s, but failed to "
+                           "disconnect connection.\n", csm_ev_str(ev),
+                           csm_state_str(state), csm_state_str(con->state));
+               break;
+               }
+       case CSM_EV_DEVICE_REMOVAL:
+               /* Send a SSM_EV_SESS_CLOSE event to the session to speed up the
+                * closing of the other connections. If we just wait for the
+                * client to close all connections this can take a while.
+                */
+               ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+               /* fall-through */
+       case CSM_EV_CON_DISCONNECTED: {
+               int err, cnt = 0;
+
+               csm_set_state(con, CSM_STATE_FLUSHING);
+               err = rdma_disconnect(con->cm_id);
+               if (err)
+                       ERR(sess, "Connection received event %s "
+                           "in %s state, new state is %s, but failed to "
+                           "disconnect connection.\n", csm_ev_str(ev),
+                           csm_state_str(state), csm_state_str(con->state));
+
+               wait_event(sess->bufs_wait,
+                          !atomic_read(&sess->stats.rdma_stats.inflight));
+               DEB("posting beacon on con %p\n", con);
+               err = post_beacon(&con->ib_con);
+               if (err) {
+                       ERR(sess, "Connection received event %s "
+                           "in %s state, new state is %s but failed to post"
+                           " beacon, closing connection.\n", csm_ev_str(ev),
+                           csm_state_str(state), csm_state_str(con->state));
+                       goto destroy;
+               }
+
+               err = ibtrs_request_cq_notifications(&con->ib_con);
+               if (unlikely(err < 0)) {
+                       WRN(con->sess, "Requesting CQ Notification for"
+                           " ib_con failed. Connection will be destroyed\n");
+                       goto destroy;
+               } else if (err > 0) {
+                       err = get_process_wcs(con, &cnt);
+                       if (unlikely(err))
+                               goto destroy;
+                       break;
+               }
+               break;
+
+destroy:
+               csm_set_state(con, CSM_STATE_CLOSED);
+               close_con(con);
+               ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+
+               break;
+               }
+       default:
+               ERR(sess, "Connection received unexpected event %s "
+                   "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+       }
+}
+
+static void csm_closing(struct ibtrs_con *con, enum csm_ev ev)
+{
+       struct ibtrs_session *sess = con->sess;
+       enum csm_state state = con->state;
+
+       DEB("con %p, event %s\n", con, csm_ev_str(ev));
+       switch (ev) {
+       case CSM_EV_DEVICE_REMOVAL:
+       case CSM_EV_CON_DISCONNECTED: {
+               int err, cnt = 0;
+
+               csm_set_state(con, CSM_STATE_FLUSHING);
+
+               wait_event(sess->bufs_wait,
+                          !atomic_read(&sess->stats.rdma_stats.inflight));
+
+               DEB("posting beacon on con %p\n", con);
+               if (post_beacon(&con->ib_con)) {
+                       ERR(sess, "Connection received event %s "
+                           "in %s state, new state is %s but failed to post"
+                           " beacon, closing connection.\n", csm_ev_str(ev),
+                           csm_state_str(state), csm_state_str(con->state));
+                       goto destroy;
+               }
+
+               err = ibtrs_request_cq_notifications(&con->ib_con);
+               if (unlikely(err < 0)) {
+                       WRN(con->sess, "Requesting CQ Notification for"
+                           " ib_con failed. Connection will be destroyed\n");
+                       goto destroy;
+               } else if (err > 0) {
+                       err = get_process_wcs(con, &cnt);
+                       if (unlikely(err))
+                               goto destroy;
+                       break;
+               }
+               break;
+
+destroy:
+               csm_set_state(con, CSM_STATE_CLOSED);
+               close_con(con);
+               ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+               break;
+       }
+       case CSM_EV_CON_ERROR:
+               /* ignore connection errors, just wait for CM_DISCONNECTED */
+       case CSM_EV_SESS_CLOSING:
+               break;
+       default:
+               ERR(sess, "Connection received unexpected event %s "
+                   "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+       }
+}
+
+static void csm_flushing(struct ibtrs_con *con, enum csm_ev ev)
+{
+       struct ibtrs_session *sess = con->sess;
+       enum csm_state state = con->state;
+
+       DEB("con %p, event %s\n", con, csm_ev_str(ev));
+
+       switch (ev) {
+       case CSM_EV_BEACON_COMPLETED:
+               csm_set_state(con, CSM_STATE_CLOSED);
+               close_con(con);
+               ssm_schedule_event(sess, SSM_EV_CON_DISCONNECTED);
+               break;
+       case CSM_EV_SESS_CLOSING:
+       case CSM_EV_DEVICE_REMOVAL:
+               /* Ignore CSM_EV_DEVICE_REMOVAL and CSM_EV_SESS_CLOSING in
+                * this state. The beacon was already posted, so the
+                * CSM_EV_BEACON_COMPLETED event should arrive anytime soon.
+                */
+               break;
+       case CSM_EV_CON_ERROR:
+               break;
+       case CSM_EV_CON_DISCONNECTED:
+               /* Ignore CSM_EV_CON_DISCONNECTED. At this point we could have
+                * already received a CSM_EV_CON_DISCONNECTED for the same
+                * connection, but an additional RDMA_CM_EVENT_DISCONNECTED or
+                * RDMA_CM_EVENT_TIMEWAIT_EXIT could be generated.
+                */
+               break;
+       default:
+               ERR(sess, "Connection received unexpected event %s "
+                   "in %s state\n", csm_ev_str(ev), csm_state_str(state));
+       }
+}
+
+static void csm_closed(struct ibtrs_con *con, enum csm_ev ev)
+{
+       /* in this state, we ignore every event scheduled for this connection
+        * and just wait for the session workqueue to be flushed and the
+        * connection freed
+        */
+       DEB("con %p, event %s\n", con, csm_ev_str(ev));
+}
+
+typedef void (ibtrs_srv_csm_ev_handler_fn)(struct ibtrs_con *, enum csm_ev);
+
+static ibtrs_srv_csm_ev_handler_fn *ibtrs_srv_csm_ev_handlers[] = {
+       [CSM_STATE_REQUESTED]           = csm_requested,
+       [CSM_STATE_CONNECTED]           = csm_connected,
+       [CSM_STATE_CLOSING]             = csm_closing,
+       [CSM_STATE_FLUSHING]            = csm_flushing,
+       [CSM_STATE_CLOSED]              = csm_closed,
+};
+
+static inline void ibtrs_srv_csm_ev_handle(struct ibtrs_con *con,
+                                          enum csm_ev ev)
+{
+       return (*ibtrs_srv_csm_ev_handlers[con->state])(con, ev);
+}
+
+static void csm_worker(struct work_struct *work)
+{
+       struct csm_work *csm_w = container_of(work, struct csm_work, work);
+
+       ibtrs_srv_csm_ev_handle(csm_w->con, csm_w->ev);
+       kvfree(csm_w);
+}
+
+static void csm_schedule_event(struct ibtrs_con *con, enum csm_ev ev)
+{
+       struct csm_work *w;
+
+       if (!ibtrs_srv_sess_get(con->sess))
+               return;
+
+       while (true) {
+               if (con->state == CSM_STATE_CLOSED)
+                       goto out;
+               w = ibtrs_malloc(sizeof(*w));
+               if (w)
+                       break;
+               cond_resched();
+       }
+
+       w->con = con;
+       w->ev = ev;
+       INIT_WORK(&w->work, csm_worker);
+       queue_work(con->sess->sm_wq, &w->work);
+
+out:
+       ibtrs_srv_sess_put(con->sess);
+}
+
+static void sess_schedule_csm_event(struct ibtrs_session *sess, enum csm_ev ev)
+{
+       struct ibtrs_con *con;
+
+       list_for_each_entry(con, &sess->con_list, list)
+               csm_schedule_event(con, ev);
+}
+
+static void remove_sess_from_sysfs(struct ibtrs_session *sess)
+{
+       if (!sess->state_in_sysfs)
+               return;
+
+       kobject_del(&sess->kobj_stats);
+       kobject_del(&sess->kobj);
+       sess->state_in_sysfs = false;
+
+       ibtrs_srv_schedule_sysfs_put(sess);
+}
+
+static __always_inline int
+__ibtrs_srv_request_cq_notifications(struct ibtrs_con *con)
+{
+       return ibtrs_request_cq_notifications(&con->ib_con);
+}
+
+static int ibtrs_srv_request_cq_notifications(struct ibtrs_session *sess)
+{
+       struct ibtrs_con *con;
+       int err, cnt = 0;
+
+       list_for_each_entry(con, &sess->con_list, list)  {
+               if (con->state == CSM_STATE_CONNECTED) {
+                       err = __ibtrs_srv_request_cq_notifications(con);
+                       if (unlikely(err < 0)) {
+                               return err;
+                       } else if (err > 0) {
+                               err = get_process_wcs(con, &cnt);
+                               if (unlikely(err))
+                                       return err;
+                       }
+               }
+       }
+
+       return 0;
+}
+
+static void ssm_idle(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+       enum ssm_state state = sess->state;
+
+       DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+           sess->est_cnt);
+       switch (ev) {
+       case SSM_EV_CON_DISCONNECTED:
+               sess->est_cnt--;
+               /* fall through */
+       case SSM_EV_CON_EST_ERR:
+               if (!sess->active_cnt) {
+                       ibtrs_srv_destroy_ib_session(sess);
+                       ssm_set_state(sess, SSM_STATE_CLOSED);
+                       cancel_delayed_work(&sess->check_heartbeat_dwork);
+                       schedule_sess_put(sess);
+               } else {
+                       ssm_set_state(sess, SSM_STATE_CLOSING);
+               }
+               break;
+       case SSM_EV_CON_CONNECTED: {
+               int err;
+
+               sess->est_cnt++;
+               if (sess->est_cnt != sess->con_cnt)
+                       break;
+
+               err = ibtrs_srv_create_sess_files(sess);
+               if (err) {
+                       if (err == -EEXIST)
+                               ERR(sess,
+                                   "Session sysfs files already exist,"
+                                   " possibly a user-space process is"
+                                   " holding them\n");
+                       else
+                               ERR(sess,
+                                   "Create session sysfs files failed,"
+                                   " errno: %d\n", err);
+                       goto destroy;
+               }
+
+               sess->state_in_sysfs = true;
+
+               err = ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_CONNECTED);
+               if (err) {
+                       ERR(sess, "Notifying user session event"
+                           " failed, errno: %d\n. Session is closed", err);
+                       goto destroy;
+               }
+
+               ssm_set_state(sess, SSM_STATE_CONNECTED);
+               err = ibtrs_srv_request_cq_notifications(sess);
+               if (err) {
+                       ERR(sess, "Requesting CQ completion notifications"
+                           " failed, errno: %d. Session will be closed.\n",
+                           err);
+                       goto destroy;
+               }
+
+               break;
+destroy:
+               remove_sess_from_sysfs(sess);
+               ssm_set_state(sess, SSM_STATE_CLOSING);
+               sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+               break;
+       }
+       case SSM_EV_SESS_CLOSE:
+               ssm_set_state(sess, SSM_STATE_CLOSING);
+               sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+               break;
+       default:
+               ERR(sess, "Session received unexpected event %s "
+                   "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+       }
+}
+
+static void ssm_connected(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+       enum ssm_state state = sess->state;
+
+       DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+           sess->est_cnt);
+       switch (ev) {
+       case SSM_EV_CON_DISCONNECTED:
+               remove_sess_from_sysfs(sess);
+               sess->est_cnt--;
+
+               ssm_set_state(sess, SSM_STATE_CLOSING);
+               ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTING);
+               break;
+       case SSM_EV_SESS_CLOSE:
+       case SSM_EV_SYSFS_DISCONNECT:
+               remove_sess_from_sysfs(sess);
+               ssm_set_state(sess, SSM_STATE_CLOSING);
+               ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTING);
+
+               sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+               break;
+       default:
+               ERR(sess, "Session received unexpected event %s "
+                   "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+       }
+}
+
+static void ssm_closing(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+       enum ssm_state state = sess->state;
+
+       DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+           sess->est_cnt);
+       switch (ev) {
+       case SSM_EV_CON_CONNECTED:
+               sess->est_cnt++;
+               break;
+       case SSM_EV_CON_DISCONNECTED:
+               sess->est_cnt--;
+               /* fall through */
+       case SSM_EV_CON_EST_ERR:
+               if (sess->active_cnt == 0) {
+                       ibtrs_srv_destroy_ib_session(sess);
+                       ssm_set_state(sess, SSM_STATE_CLOSED);
+                       ibtrs_srv_sess_ev(sess, IBTRS_SRV_SESS_EV_DISCONNECTED);
+                       cancel_delayed_work(&sess->check_heartbeat_dwork);
+                       schedule_sess_put(sess);
+               }
+               break;
+       case SSM_EV_SESS_CLOSE:
+               sess_schedule_csm_event(sess, CSM_EV_SESS_CLOSING);
+               break;
+       case SSM_EV_SYSFS_DISCONNECT:
+               /* just ignore it, the connection should have a
+                * CSM_EV_SESS_CLOSING event on the queue to be
+                * processed later
+                */
+               break;
+       default:
+               ERR(sess, "Session received unexpected event %s "
+                   "in %s state.\n", ssm_ev_str(ev), ssm_state_str(state));
+       }
+}
+
+static void ssm_closed(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+       /* in this state, we ignore every event and wait for the session
+        * to be destroyed
+        */
+       DEB("sess %p, event %s, est_cnt=%d\n", sess, ssm_ev_str(ev),
+           sess->est_cnt);
+}
+
+typedef void (ssm_ev_handler_fn)(struct ibtrs_session *, enum ssm_ev);
+
+static ssm_ev_handler_fn *ibtrs_srv_ev_handlers[] = {
+       [SSM_STATE_IDLE]                = ssm_idle,
+       [SSM_STATE_CONNECTED]           = ssm_connected,
+       [SSM_STATE_CLOSING]             = ssm_closing,
+       [SSM_STATE_CLOSED]              = ssm_closed,
+};
+
+static void check_heartbeat_work(struct work_struct *work)
+{
+       struct ibtrs_session *sess;
+
+       sess = container_of(to_delayed_work(work), struct ibtrs_session,
+                           check_heartbeat_dwork);
+
+       if (ibtrs_heartbeat_timeout_is_expired(&sess->heartbeat)) {
+               ssm_schedule_event(sess, SSM_EV_SESS_CLOSE);
+               return;
+       }
+
+       ibtrs_heartbeat_warn(&sess->heartbeat);
+
+       if (WARN_ON(!queue_delayed_work(sess->sm_wq,
+                                       &sess->check_heartbeat_dwork,
+                                       HEARTBEAT_INTV_JIFFIES)))
+               WRN_RL(sess, "Schedule check heartbeat work failed, "
+                      "check_heartbeat worker already queued?\n");
+}
+
+static void send_heartbeat_work(struct work_struct *work)
+{
+       struct ibtrs_session *sess;
+       int err;
+
+       sess = container_of(to_delayed_work(work), struct ibtrs_session,
+                           send_heartbeat_dwork);
+
+       if (ibtrs_heartbeat_send_ts_diff_ms(&sess->heartbeat) >=
+           HEARTBEAT_INTV_MS) {
+               err = send_heartbeat(sess);
+               if (unlikely(err)) {
+                       WRN_RL(sess,
+                              "Sending heartbeat failed, errno: %d,"
+                              " no further heartbeat will be sent\n", err);
+                       return;
+               }
+       }
+
+       if (WARN_ON(!queue_delayed_work(sess->sm_wq,
+                                       &sess->send_heartbeat_dwork,
+                                       HEARTBEAT_INTV_JIFFIES)))
+               WRN_RL(sess, "schedule send heartbeat work failed, "
+                      "send_heartbeat worker already queued?\n");
+}
+
+static inline void ssm_ev_handle(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+       return (*ibtrs_srv_ev_handlers[sess->state])(sess, ev);
+}
+
+static void ssm_worker(struct work_struct *work)
+{
+       struct ssm_work *ssm_w = container_of(work, struct ssm_work, work);
+
+       ssm_ev_handle(ssm_w->sess, ssm_w->ev);
+       kvfree(ssm_w);
+}
+
+static int ssm_schedule_event(struct ibtrs_session *sess, enum ssm_ev ev)
+{
+       struct ssm_work *w;
+       int ret = 0;
+
+       if (!ibtrs_srv_sess_get(sess))
+               return -EPERM;
+
+       while (true) {
+               if (sess->state == SSM_STATE_CLOSED) {
+                       ret = -EPERM;
+                       goto out;
+               }
+               w = ibtrs_malloc(sizeof(*w));
+               if (w)
+                       break;
+               cond_resched();
+       }
+
+       w->sess = sess;
+       w->ev = ev;
+       INIT_WORK(&w->work, ssm_worker);
+       queue_work(sess->sm_wq, &w->work);
+
+out:
+       ibtrs_srv_sess_put(sess);
+       return ret;
+}
+
+static int ssm_init(struct ibtrs_session *sess)
+{
+       sess->sm_wq = create_singlethread_workqueue("ibtrs_ssm_wq");
+       if (!sess->sm_wq)
+               return -ENOMEM;
+
+       INIT_DELAYED_WORK(&sess->check_heartbeat_dwork, check_heartbeat_work);
+       INIT_DELAYED_WORK(&sess->send_heartbeat_dwork, send_heartbeat_work);
+
+       ssm_set_state(sess, SSM_STATE_IDLE);
+
+       return 0;
+}
+
+static int ibtrs_srv_create_debugfs_files(void)
+{
+       int ret = 0;
+       struct dentry *file;
+
+       ibtrs_srv_debugfs_dir = debugfs_create_dir("ibtrs_server", NULL);
+       if (IS_ERR_OR_NULL(ibtrs_srv_debugfs_dir)) {
+               ibtrs_srv_debugfs_dir = NULL;
+               ret = PTR_ERR(ibtrs_srv_debugfs_dir);
+               if (ret == -ENODEV)
+                       WRN_NP("Debugfs not enabled in kernel\n");
+               else
+                       WRN_NP("Failed to create top-level debugfs directory,"
+                              " errno: %d\n", ret);
+               goto out;
+       }
+
+       mempool_debugfs_dir = debugfs_create_dir("mempool",
+                                                ibtrs_srv_debugfs_dir);
+       if (IS_ERR_OR_NULL(mempool_debugfs_dir)) {
+               ret = PTR_ERR(mempool_debugfs_dir);
+               WRN_NP("Failed to create mempool debugfs directory,"
+                      " errno: %d\n", ret);
+               goto out_remove;
+       }
+
+       file = debugfs_create_u32("nr_free_buf_pool", 0444,
+                                 mempool_debugfs_dir, &nr_free_buf_pool);
+       if (IS_ERR_OR_NULL(file)) {
+               WRN_NP("Failed to create mempool \"nr_free_buf_pool\""
+                      " debugfs file\n");
+               ret = -EINVAL;
+               goto out_remove;
+       }
+
+       file = debugfs_create_u32("nr_total_buf_pool", 0444,
+                                 mempool_debugfs_dir, &nr_total_buf_pool);
+       if (IS_ERR_OR_NULL(file)) {
+               WRN_NP("Failed to create mempool \"nr_total_buf_pool\""
+                      " debugfs file\n");
+               ret = -EINVAL;
+               goto out_remove;
+       }
+
+       file = debugfs_create_u32("nr_active_sessions", 0444,
+                                 mempool_debugfs_dir, &nr_active_sessions);
+       if (IS_ERR_OR_NULL(file)) {
+               WRN_NP("Failed to create mempool \"nr_active_sessions\""
+                      " debugfs file\n");
+               ret = -EINVAL;
+               goto out_remove;
+       }
+
+       goto out;
+
+out_remove:
+       debugfs_remove_recursive(ibtrs_srv_debugfs_dir);
+       ibtrs_srv_debugfs_dir = NULL;
+       mempool_debugfs_dir = NULL;
+out:
+       return ret;
+}
+
+static void ibtrs_srv_destroy_debugfs_files(void)
+{
+       debugfs_remove_recursive(ibtrs_srv_debugfs_dir);
+}
+
+static int __init ibtrs_server_init(void)
+{
+       int err;
+
+       if (!strlen(cq_affinity_list))
+               init_cq_affinity();
+
+       scnprintf(hostname, sizeof(hostname), "%s", utsname()->nodename);
+       INFO_NP("Loading module ibtrs_server, version: %s ("
+               " retry_count: %d, "
+               " default_heartbeat_timeout_ms: %d,"
+               " cq_affinity_list: %s, max_io_size: %d,"
+               " sess_queue_depth: %d, init_pool_size: %d,"
+               " pool_size_hi_wm: %d, hostname: %s)\n",
+               __stringify(IBTRS_VER),
+               retry_count, default_heartbeat_timeout_ms,
+               cq_affinity_list, max_io_size, sess_queue_depth,
+               init_pool_size, pool_size_hi_wm, hostname);
+
+       err = check_module_params();
+       if (err) {
+               ERR_NP("Failed to load module, invalid module parameters,"
+                      " errno: %d\n", err);
+               return err;
+       }
+
+       destroy_wq = alloc_workqueue("ibtrs_server_destroy_wq", 0, 0);
+       if (!destroy_wq) {
+               ERR_NP("Failed to load module,"
+                      " alloc ibtrs_server_destroy_wq failed\n");
+               return -ENOMEM;
+       }
+
+       err = ibtrs_srv_create_sysfs_files();
+       if (err) {
+               ERR_NP("Failed to load module, can't create sysfs files,"
+                      " errno: %d\n", err);
+               goto out_destroy_wq;
+       }
+
+       err = ibtrs_srv_create_debugfs_files();
+       if (err)
+               WRN_NP("Unable to create debugfs files, errno: %d."
+                      " Continuing without debugfs\n", err);
+
+       return 0;
+
+out_destroy_wq:
+       destroy_workqueue(destroy_wq);
+       return err;
+}
+
+static void __exit ibtrs_server_exit(void)
+{
+       INFO_NP("Unloading module\n");
+       ibtrs_srv_destroy_debugfs_files();
+       ibtrs_srv_destroy_sysfs_files();
+       destroy_workqueue(destroy_wq);
+
+       INFO_NP("Module unloaded\n");
+}
+
+module_init(ibtrs_server_init);
+module_exit(ibtrs_server_exit);
-- 
2.7.4

Reply via email to