From: Liu Yuan <tailai...@taobao.com>

v2:
 - fix memory leak in check_idx()
 - remove extra whitespace
---------------------------------- >8

This will scale sheep daemon to serve more VMs on one node and do
it adoptively and automatically.

- fd count default to 16 instead of previous 8

Signed-off-by: Liu Yuan <tailai...@taobao.com>
---
 sheep/sheep.c        |    1 +
 sheep/sheep_priv.h   |    1 +
 sheep/sockfd_cache.c |  127 ++++++++++++++++++++++++++++++++++++++------------
 3 files changed, 98 insertions(+), 31 deletions(-)

diff --git a/sheep/sheep.c b/sheep/sheep.c
index df28a94..380a129 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -344,6 +344,7 @@ int main(int argc, char **argv)
        sys->recovery_wqueue = init_work_queue("recovery", true);
        sys->deletion_wqueue = init_work_queue("deletion", true);
        sys->block_wqueue = init_work_queue("block", true);
+       sys->sockfd_wqueue = init_work_queue("sockfd", true);
        if (!sys->gateway_wqueue || !sys->io_wqueue ||!sys->recovery_wqueue ||
            !sys->deletion_wqueue || !sys->block_wqueue)
                exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index e455d27..530fe14 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -122,6 +122,7 @@ struct cluster_info {
        struct work_queue *deletion_wqueue;
        struct work_queue *recovery_wqueue;
        struct work_queue *block_wqueue;
+       struct work_queue *sockfd_wqueue;
 };
 
 struct siocb {
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index aa90299..c130ea6 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -48,23 +48,33 @@ static struct sockfd_cache sockfd_cache = {
        .lock = PTHREAD_RWLOCK_INITIALIZER,
 };
 
+/*
+ * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at
+ * most 8 requests can be issued to the same sheep object. Based on this
+ * assumption, '16' would be effecient for servers that only host 2~4
+ * Guests.
+ *
+ * This fd count will be dynamically grown when the idx reaches watermark.
+ */
+#define DEFAULT_FDS_COUNT      16
+#define DEFAULT_FDS_WATERMARK  12
+
+/* How many FDs we cache for one node */
+static int fds_count = DEFAULT_FDS_COUNT;
+
+struct sockfd_cache_fd {
+       int fd;
+       uint8_t fd_in_use;
+};
+
 struct sockfd_cache_entry {
        struct rb_node rb;
        struct node_id nid;
-#define SOCKFD_CACHE_MAX_FD    8 /* How many FDs we cache for one node */
-       /*
-        * FIXME: Make this macro configurable.
-        *
-        * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at
-        * most 8 requests can be issued to the same sheep object. Based on this
-        * assumption, '8' would be effecient for servers that only host 4~8
-        * Guests, but for powerful servers that can host dozens of Guests, we
-        * might consider bigger value.
-        */
-       int fd[SOCKFD_CACHE_MAX_FD];
-       uint8_t fd_in_use[SOCKFD_CACHE_MAX_FD];
+       struct sockfd_cache_fd *fds;
 };
 
+#define fds_bytes (sizeof(struct sockfd_cache_fd) * fds_count)
+
 static struct sockfd_cache_entry *
 sockfd_cache_insert(struct sockfd_cache_entry *new)
 {
@@ -118,8 +128,8 @@ static inline int get_free_slot(struct sockfd_cache_entry 
*entry)
 {
        int idx = -1, i;
 
-       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) {
-               if (uatomic_cmpxchg(&entry->fd_in_use[i], 0, 1))
+       for (i = 0; i < fds_count; i++) {
+               if (uatomic_cmpxchg(&entry->fds[i].fd_in_use, 0, 1))
                        continue;
                idx = i;
                break;
@@ -155,8 +165,8 @@ out:
 static inline bool slots_all_free(struct sockfd_cache_entry *entry)
 {
        int i;
-       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
-               if (uatomic_read(&entry->fd_in_use[i]))
+       for (i = 0; i < fds_count; i++)
+               if (uatomic_read(&entry->fds[i].fd_in_use))
                        return false;
        return true;
 }
@@ -164,9 +174,9 @@ static inline bool slots_all_free(struct sockfd_cache_entry 
*entry)
 static inline void destroy_all_slots(struct sockfd_cache_entry *entry)
 {
        int i;
-       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
-               if (entry->fd[i] != -1)
-                       close(entry->fd[i]);
+       for (i = 0; i < fds_count; i++)
+               if (entry->fds[i].fd != -1)
+                       close(entry->fds[i].fd);
 }
 
 /*
@@ -220,11 +230,12 @@ void sockfd_cache_del(struct node_id *nid)
 
 static void sockfd_cache_add_nolock(struct node_id *nid)
 {
-       struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+       struct sockfd_cache_entry *new = xmalloc(sizeof(*new));
        int i;
 
-       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
-               new->fd[i] = -1;
+       new->fds = xzalloc(fds_bytes);
+       for (i = 0; i < fds_count; i++)
+               new->fds[i].fd = -1;
 
        memcpy(&new->nid, nid, sizeof(struct node_id));
        if (sockfd_cache_insert(new)) {
@@ -251,15 +262,17 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr)
 /* Add one node to the cache means we can do caching tricks on this node */
 void sockfd_cache_add(struct node_id *nid)
 {
-       struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+       struct sockfd_cache_entry *new;
        char name[INET6_ADDRSTRLEN];
        int n, i;
 
-       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
-               new->fd[i] = -1;
+       pthread_rwlock_rdlock(&sockfd_cache.lock);
+       new = xmalloc(sizeof(*new));
+       new->fds = xzalloc(fds_bytes);
+       for (i = 0; i < fds_count; i++)
+               new->fds[i].fd = -1;
 
        memcpy(&new->nid, nid, sizeof(struct node_id));
-       pthread_rwlock_rdlock(&sockfd_cache.lock);
        if (sockfd_cache_insert(new)) {
                free(new);
                pthread_rwlock_unlock(&sockfd_cache.lock);
@@ -271,6 +284,56 @@ void sockfd_cache_add(struct node_id *nid)
        dprintf("%s:%d, count %d\n", name, nid->port, n);
 }
 
+static void do_grow_fds(struct work *work)
+{
+       struct sockfd_cache_entry *entry;
+       struct rb_node *p;
+       int old_fds_count, new_fds_count, new_size, i;
+
+       dprintf("%d\n", fds_count);
+       pthread_rwlock_wrlock(&sockfd_cache.lock);
+       old_fds_count = fds_count;
+       new_fds_count = fds_count * 2;
+       new_size = fds_bytes * 2;
+       for (p = rb_first(&sockfd_cache.root); p; p = rb_next(p)) {
+               entry = rb_entry(p, struct sockfd_cache_entry, rb);
+               entry->fds = xrealloc(entry->fds, new_size);
+               for (i = old_fds_count; i < new_fds_count; i++) {
+                       entry->fds[i].fd = -1;
+                       entry->fds[i].fd_in_use = 0;
+               }
+       }
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+}
+
+static bool fds_in_grow;
+static int fds_high_watermark = DEFAULT_FDS_WATERMARK;
+
+static void grow_fds_done(struct work *work)
+{
+       fds_in_grow = false;
+       fds_count *= 2;
+       fds_high_watermark = fds_count * 3 / 4;
+       dprintf("fd count has been grown into %d\n", fds_count);
+       free(work);
+}
+
+static void inline check_idx(int idx)
+{
+       struct work *w;
+
+       if (idx <= fds_high_watermark)
+               return;
+       if (fds_in_grow)
+               return;
+
+       w = xmalloc(sizeof(*w));
+       w->fn = do_grow_fds;
+       w->done = grow_fds_done;
+       fds_in_grow = true;
+       queue_work(sys->sockfd_wqueue, w);
+}
+
 static struct sockfd *sockfd_cache_get(struct node_id *nid, char *name)
 {
        struct sockfd_cache_entry *entry;
@@ -281,7 +344,9 @@ static struct sockfd *sockfd_cache_get(struct node_id *nid, 
char *name)
        if (!entry)
                return NULL;
 
-       if (entry->fd[idx] != -1) {
+       check_idx(idx);
+
+       if (entry->fds[idx].fd != -1) {
                dprintf("%s:%d, idx %d\n", name, nid->port, idx);
                goto out;
        }
@@ -290,14 +355,14 @@ static struct sockfd *sockfd_cache_get(struct node_id 
*nid, char *name)
        dprintf("create connection %s:%d idx %d\n", name, nid->port, idx);
        fd = connect_to(name, nid->port);
        if (fd < 0) {
-               uatomic_dec(&entry->fd_in_use[idx]);
+               uatomic_dec(&entry->fds[idx].fd_in_use);
                return NULL;
        }
-       entry->fd[idx] = fd;
+       entry->fds[idx].fd = fd;
 
 out:
        sfd = xmalloc(sizeof(*sfd));
-       sfd->fd = entry->fd[idx];
+       sfd->fd = entry->fds[idx].fd;
        sfd->idx = idx;
        return sfd;
 }
@@ -316,7 +381,7 @@ static void sockfd_cache_put(struct node_id *nid, int idx)
        pthread_rwlock_unlock(&sockfd_cache.lock);
 
        assert(entry);
-       refcnt = uatomic_cmpxchg(&entry->fd_in_use[idx], 1, 0);
+       refcnt = uatomic_cmpxchg(&entry->fds[idx].fd_in_use, 1, 0);
        assert(refcnt == 1);
 }
 
-- 
1.7.10.2

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to