At Fri, 18 Jul 2014 15:58:12 +0800,
Ruoyu wrote:
> 
> Zookeeper occupies more and more memory in our production environment
> because the nodes in /sheepdog/queue are never deleted even after
> the messages were already handled.
> 
> So, we need a tool to list and purge them periodically.
> 
> Signed-off-by: Ruoyu <[email protected]>
> ---
>  tools/zk_control.c | 231 
> +++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 231 insertions(+)

Applied, thanks. BTW, could you write a brief description of
difference between versions here from next time?

Thanks,
Hitoshi

> 
> diff --git a/tools/zk_control.c b/tools/zk_control.c
> index 6b3b136..53fd1d0 100644
> --- a/tools/zk_control.c
> +++ b/tools/zk_control.c
> @@ -13,6 +13,14 @@
>  
>  #include <zookeeper/zookeeper.h>
>  #include <string.h>
> +#include <arpa/inet.h>
> +
> +#include "list.h"
> +#include "rbtree.h"
> +#include "internal_proto.h"
> +
> +#define QUEUE_ZONE "/sheepdog/queue"
> +#define MIN_THRESHOLD 86400
>  
>  #define FOR_EACH_ZNODE(parent, path, strs)                          \
>       for ((strs)->data += (strs)->count;                            \
> @@ -21,9 +29,90 @@
>                             *--(strs)->data) : (free((strs)->data), 0); \
>            free(*(strs)->data))
>  
> +enum zk_event_type {
> +     EVENT_JOIN = 1,
> +     EVENT_ACCEPT,
> +     EVENT_LEAVE,
> +     EVENT_BLOCK,
> +     EVENT_UNBLOCK,
> +     EVENT_NOTIFY,
> +     EVENT_UPDATE_NODE,
> +};
> +
> +struct zk_node {
> +     struct list_node list;
> +     struct rb_node rb;
> +     struct sd_node node;
> +     bool callbacked;
> +     bool gone;
> +};
> +
> +#define ZK_MAX_BUF_SIZE (1*1024*1024) /* 1M */
> +
> +struct zk_event {
> +     uint64_t id;
> +     enum zk_event_type type;
> +     struct zk_node sender;
> +     size_t msg_len;
> +     size_t nr_nodes;
> +     size_t buf_len;
> +     uint8_t buf[ZK_MAX_BUF_SIZE];
> +};
> +
>  static const char *hosts = "127.0.0.1:2181";
>  static zhandle_t *zk_handle;
>  
> +static const char *evtype_to_str(int type)
> +{
> +     switch (type) {
> +     case EVENT_JOIN:
> +             return "JOIN";
> +     case EVENT_ACCEPT:
> +             return "ACCEPT";
> +     case EVENT_LEAVE:
> +             return "LEAVE";
> +     case EVENT_BLOCK:
> +             return "BLOCK";
> +     case EVENT_UNBLOCK:
> +             return "UNBLOCK";
> +     case EVENT_NOTIFY:
> +             return "NOTIFY";
> +     case EVENT_UPDATE_NODE:
> +             return "UPDATE_NODE";
> +     default:
> +             return "UNKNOWN";
> +     }
> +}
> +
> +static const char *addr_to_str(const uint8_t *addr, uint16_t port)
> +{
> +     static __thread char str[HOST_NAME_MAX + 8];
> +     int af = AF_INET6;
> +     int addr_start_idx = 0;
> +     const char *ret;
> +
> +     /* Find address family type */
> +     if (addr[12]) {
> +             int  oct_no = 0;
> +             while (!addr[oct_no] && oct_no++ < 12)
> +                     ;
> +             if (oct_no == 12) {
> +                     af = AF_INET;
> +                     addr_start_idx = 12;
> +             }
> +     }
> +     ret = inet_ntop(af, addr + addr_start_idx, str, sizeof(str));
> +     if (unlikely(ret == NULL))
> +             fprintf(stderr, "failed to convert addr to string, %m\n");
> +
> +     if (port) {
> +             int  len = strlen(str);
> +             snprintf(str + len, sizeof(str) - len, ":%d", port);
> +     }
> +
> +     return str;
> +}
> +
>  static inline ZOOAPI int zk_delete_node(const char *path)
>  {
>       int rc;
> @@ -45,6 +134,18 @@ static inline ZOOAPI int zk_get_children(const char *path,
>       return rc;
>  }
>  
> +static inline ZOOAPI int zk_get_data(const char *path, void *buffer,
> +                                  int *buffer_len, struct Stat *stat)
> +{
> +     int rc;
> +     do {
> +             rc = zoo_get(zk_handle, path, 1, (char *)buffer,
> +                          buffer_len, stat);
> +     } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
> +
> +     return rc;
> +}
> +
>  static int do_kill(int argc, char **argv)
>  {
>       char *path;
> @@ -130,6 +231,134 @@ err:
>       return -1;
>  }
>  
> +static int do_list(int argc, char **argv)
> +{
> +     struct String_vector strs;
> +     int rc, len, total = 0;
> +     char path[256], str_ctime[128], str_mtime[128];
> +     time_t t1, t2;
> +     struct tm tm_ctime, tm_mtime;
> +     struct zk_event ev;
> +     struct Stat stat;
> +     int32_t seq;
> +     struct node_id *nid;
> +
> +     fprintf(stdout, "     QUEUE                ID          TYPE"
> +             "                 SENDER  MSG LEN    NR  BUF LEN"
> +             "          CREATE TIME          MODIFY TIME\n");
> +     rc = zk_get_children(QUEUE_ZONE, &strs);
> +     switch (rc) {
> +     case ZOK:
> +             FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
> +                     len = sizeof(struct zk_event);
> +                     rc = zk_get_data(path, &ev, &len, &stat);
> +                     if (rc != ZOK) {
> +                             fprintf(stderr, "failed to get data "
> +                                     "%s, %s\n",
> +                                     path, zerror(rc));
> +                             goto err;
> +                     }
> +
> +                     t1 = stat.ctime / 1000;
> +                     localtime_r(&t1, &tm_ctime);
> +                     strftime(str_ctime, sizeof(str_ctime),
> +                                     "%Y-%m-%d %H:%M:%S", &tm_ctime);
> +
> +                     t2 = stat.mtime / 1000;
> +                     localtime_r(&t2, &tm_mtime);
> +                     strftime(str_mtime, sizeof(str_mtime),
> +                                     "%Y-%m-%d %H:%M:%S", &tm_mtime);
> +
> +                     sscanf(path, QUEUE_ZONE "/%"PRId32, &seq);
> +                     nid = &ev.sender.node.nid;
> +                     fprintf(stdout, "%010"PRId32"  %016"PRIx64
> +                             "  %12s  %21s  %7zd  %4zd  %7zd  %s  %s\n",
> +                             seq, ev.id, evtype_to_str(ev.type),
> +                             addr_to_str(nid->addr, nid->port),
> +                             ev.msg_len, ev.nr_nodes, ev.buf_len,
> +                             str_ctime, str_mtime);
> +                     total++;
> +             }
> +             break;
> +     default:
> +             goto err;
> +     }
> +
> +     fprintf(stdout, "\ntotal nodes: %d\n", total);
> +     return 0;
> +err:
> +     fprintf(stderr, "failed to list %s, %s\n", QUEUE_ZONE, zerror(rc));
> +     return -1;
> +}
> +
> +static int do_purge(int argc, char **argv)
> +{
> +     struct String_vector strs;
> +     int rc, len, threshold, deleted = 0;
> +     char *p, path[256];
> +     struct zk_event ev;
> +     struct Stat stat;
> +     struct timeval tv;
> +
> +     if (argc != 3) {
> +             fprintf(stderr, "remove queue: need specify "
> +                             "threshold in seconds\n");
> +             return -1;
> +     }
> +
> +     threshold = strtol(argv[2], &p, 10);
> +     if (p == argv[2]) {
> +             fprintf(stderr, "threshold must be a number\n");
> +             return -1;
> +     }
> +     if (threshold < MIN_THRESHOLD) {
> +             threshold = MIN_THRESHOLD;
> +             fprintf(stdout, "threshold is less than %d seconds, "
> +                     "set it to %d\n", MIN_THRESHOLD, MIN_THRESHOLD);
> +     }
> +
> +     gettimeofday(&tv, NULL);
> +
> +     rc = zk_get_children(QUEUE_ZONE, &strs);
> +     switch (rc) {
> +     case ZOK:
> +             FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) {
> +                     len = sizeof(struct zk_event);
> +                     rc = zk_get_data(path, &ev, &len, &stat);
> +                     if (rc != ZOK) {
> +                             fprintf(stderr, "failed to get data "
> +                                     "%s, %s\n",
> +                                     path, zerror(rc));
> +                             goto err;
> +                     }
> +                     if (stat.mtime / 1000 >= tv.tv_sec - threshold)
> +                             continue;
> +
> +                     rc = zk_delete_node(path);
> +                     if (rc != ZOK) {
> +                             fprintf(stderr, "failed to delete "
> +                                     "%s, %s\n",
> +                                     path, zerror(rc));
> +                             goto err;
> +                     }
> +
> +                     deleted++;
> +                     if (deleted % 100 == 0)
> +                             fprintf(stdout, "%d queue nodes are deleted\n",
> +                                             deleted);
> +             }
> +             break;
> +     default:
> +             goto err;
> +     }
> +
> +     fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted);
> +     return 0;
> +err:
> +     fprintf(stderr, "failed to purge %s, %s\n", QUEUE_ZONE, zerror(rc));
> +     return -1;
> +}
> +
>  static struct control_handler {
>       const char *name;
>       int (*execute)(int, char **);
> @@ -137,6 +366,8 @@ static struct control_handler {
>  } handlers[] = {
>       { "kill", do_kill, "Kill the session" },
>       { "remove", do_remove, "Remove the node recursively" },
> +     { "list", do_list, "List the data in queue node" },
> +     { "purge", do_purge, "Remove the data in queue node" },
>       { NULL, NULL, NULL },
>  };
>  
> -- 
> 1.8.3.2
> 
> 
> -- 
> sheepdog mailing list
> [email protected]
> http://lists.wpkg.org/mailman/listinfo/sheepdog
-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to