Module: kamailio
Branch: master
Commit: 6d0ec36a4a3b721de2a05680a2db230600e4e494
URL: 
https://github.com/kamailio/kamailio/commit/6d0ec36a4a3b721de2a05680a2db230600e4e494

Author: Emmanuel Schmidbauer <emman...@getweave.com>
Committer: Emmanuel Schmidbauer <emman...@getweave.com>
Date: 2016-12-08T12:29:38-07:00

nsq: use max_in_flight value

---

Modified: src/modules/nsq/nsq_mod.c

---

Diff:  
https://github.com/kamailio/kamailio/commit/6d0ec36a4a3b721de2a05680a2db230600e4e494.diff
Patch: 
https://github.com/kamailio/kamailio/commit/6d0ec36a4a3b721de2a05680a2db230600e4e494.patch

---

diff --git a/src/modules/nsq/nsq_mod.c b/src/modules/nsq/nsq_mod.c
index 61d6334..6c23603 100644
--- a/src/modules/nsq/nsq_mod.c
+++ b/src/modules/nsq/nsq_mod.c
@@ -263,7 +263,7 @@ int set_non_blocking(int fd)
 /**
  *
  */
-int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
+int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel, int 
max_in_flight)
 {
        struct ev_loop *loop;
        loop = ev_default_loop(0);
@@ -279,6 +279,7 @@ int nsq_consumer_worker_proc(int cmd_pipe, char *topic, 
char *channel)
        LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel 
[%s]\n", topic, channel);
        // setup the reader
        rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, 
NULL, nsq_message_handler);
+       rdr->max_in_flight = max_in_flight;
 
        if (consumer_use_nsqd == 0) {
                snprintf(address, 128, "%.*s", nsq_lookupd_address.len, 
nsq_lookupd_address.s);
@@ -300,6 +301,11 @@ static int mod_child_init(int rank)
        int pid;
        int i;
        int workers = dbn_consumer_workers / nsq_topic_channel_counter;
+       int max_in_flight = 1;
+
+       if (nsq_max_in_flight > 1) {
+               max_in_flight = nsq_max_in_flight;
+       }
 
        fire_init_event(rank);
 
@@ -318,7 +324,7 @@ static int mod_child_init(int rank)
                                        return -1; /* error */
                                if (pid==0){
                                        close(nsq_worker_pipes_fds[i*2+1]);
-                                       
return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, 
DEFAULT_CHANNEL));
+                                       
return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, 
DEFAULT_CHANNEL, max_in_flight));
                                }
                        }
                } else {
@@ -329,7 +335,7 @@ static int mod_child_init(int rank)
                                                return -1; /* error */
                                        if (pid==0){
                                                
close(nsq_worker_pipes_fds[i*2+1]);
-                                               
return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, 
tc->channel));
+                                               
return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, 
tc->channel, max_in_flight));
                                        }
                                }
                                tc = tc->next;


_______________________________________________
sr-dev mailing list
sr-dev@lists.sip-router.org
http://lists.sip-router.org/cgi-bin/mailman/listinfo/sr-dev

Reply via email to