Module: kamailio
Branch: master
Commit: a850af45acc4a433706120110d1cb91cd885b33b
URL: 
https://github.com/kamailio/kamailio/commit/a850af45acc4a433706120110d1cb91cd885b33b

Author: Emmanuel Schmidbauer <emman...@getweave.com>
Committer: GitHub <nore...@github.com>
Date: 2017-02-09T16:06:40-05:00

Merge pull request #982 from kamailio/NSQ-child-process-rank

nsq: do not use PROC_SIPINIT rank for consumer workers

---

Modified: src/modules/nsq/nsq_mod.c
Modified: src/modules/nsq/nsq_mod.h

---

Diff:  
https://github.com/kamailio/kamailio/commit/a850af45acc4a433706120110d1cb91cd885b33b.diff
Patch: 
https://github.com/kamailio/kamailio/commit/a850af45acc4a433706120110d1cb91cd885b33b.patch

---

diff --git a/src/modules/nsq/nsq_mod.c b/src/modules/nsq/nsq_mod.c
index cf6c978..12a9d69 100644
--- a/src/modules/nsq/nsq_mod.c
+++ b/src/modules/nsq/nsq_mod.c
@@ -159,7 +159,6 @@ static int fire_init_event(int rank)
 
 static int mod_init(void)
 {
-       int i;
        startup_time = (int) time(NULL);
 
        if (dbn_pua_mode == 1) {
@@ -208,26 +207,6 @@ static int mod_init(void)
        register_procs(total_workers);
        cfg_register_child(total_workers);
 
-       if (pipe(nsq_cmd_pipe_fds) < 0) {
-               LM_ERR("cmd pipe() failed\n");
-               return -1;
-       }
-
-       nsq_worker_pipes_fds = (int*) shm_malloc(sizeof(int) * 
(dbn_consumer_workers) * 2 );
-       nsq_worker_pipes = (int*) shm_malloc(sizeof(int) * 
dbn_consumer_workers);
-       for (i=0; i < dbn_consumer_workers; i++) {
-               nsq_worker_pipes_fds[i*2] = nsq_worker_pipes_fds[i*2+1] = -1;
-               if (pipe(&nsq_worker_pipes_fds[i*2]) < 0) {
-                       LM_ERR("worker pipe(%d) failed\n", i);
-                       return -1;
-               }
-       }
-
-       nsq_cmd_pipe = nsq_cmd_pipe_fds[1];
-       for (i=0; i < dbn_consumer_workers; i++) {
-               nsq_worker_pipes[i] = nsq_worker_pipes_fds[i*2+1];
-       }
-
        return 0;
 }
 
@@ -240,25 +219,10 @@ int mod_register(char *path, int *dlflags, void *p1, void 
*p2)
        return register_trans_mod(path, mod_trans);
 }
 
-
-int set_non_blocking(int fd)
-{
-       int flags;
-
-       flags = fcntl(fd, F_GETFL);
-       if (flags < 0)
-               return flags;
-       flags |= O_NONBLOCK;
-       if (fcntl(fd, F_SETFL, flags) < 0)
-               return -1;
-
-       return 0;
-}
-
 /**
  *
  */
-int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int 
max_in_flight)
+void nsq_consumer_worker_proc(char *topic, char *channel, int max_in_flight)
 {
        struct ev_loop *loop;
        loop = ev_default_loop(0);
@@ -269,7 +233,6 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, 
char *channel, int max_i
        if (loop == NULL) {
                LM_ERR("cannot get libev loop\n");
        }
-       set_non_blocking(cmd_pipe);
 
        LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel 
[%s]\n", topic, channel);
        // setup the reader
@@ -285,7 +248,6 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, 
char *channel, int max_i
        }
 
        nsq_run(loop);
-       return 0;
 }
 
 /**
@@ -318,8 +280,10 @@ static int mod_child_init(int rank)
                                if (pid<0)
                                        return -1; /* error */
                                if (pid==0){
-                                       close(nsq_worker_pipes_fds[i*2+1]);
-                                       
return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, 
DEFAULT_CHANNEL, max_in_flight));
+                                       if (cfg_child_init()) return -1;
+                                       nsq_consumer_worker_proc(DEFAULT_TOPIC, 
DEFAULT_CHANNEL, max_in_flight);
+                                       LM_CRIT("nsq_consumer_worker_proc():: 
worker_process finished without exit!\n");
+                                       exit(-1);
                                }
                        }
                } else {
@@ -329,8 +293,10 @@ static int mod_child_init(int rank)
                                        if (pid<0)
                                                return -1; /* error */
                                        if (pid==0){
-                                               
close(nsq_worker_pipes_fds[i*2+1]);
-                                               
return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, 
tc->channel, max_in_flight));
+                                               if (cfg_child_init()) return -1;
+                                               
nsq_consumer_worker_proc(tc->topic, tc->channel, max_in_flight);
+                                               
LM_CRIT("nsq_consumer_worker_proc():: worker_process finished without exit!\n");
+                                               exit(-1);
                                        }
                                }
                                tc = tc->next;
@@ -367,6 +333,4 @@ static int mod_child_init(int rank)
  */
 static void mod_destroy(void) {
        free_tc_list(tc_list);
-       shm_free(nsq_worker_pipes_fds);
-       shm_free(nsq_worker_pipes);
 }
diff --git a/src/modules/nsq/nsq_mod.h b/src/modules/nsq/nsq_mod.h
index bbbc32c..72323a4 100644
--- a/src/modules/nsq/nsq_mod.h
+++ b/src/modules/nsq/nsq_mod.h
@@ -72,10 +72,6 @@ char nsq_json_escape_char = '%';
 int nsq_topic_channel_counter = 0;
 int dbn_consumer_workers = DBN_DEFAULT_NO_WORKERS;
 int startup_time = 0;
-int *nsq_worker_pipes_fds = NULL;
-int *nsq_worker_pipes = NULL;
-int nsq_cmd_pipe = 0;
-int nsq_cmd_pipe_fds[2] = {-1,-1};
 
 /* database connection */
 db1_con_t *nsq_pa_db = NULL;


_______________________________________________
sr-dev mailing list
sr-dev@lists.sip-router.org
http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev

Reply via email to