BBlack has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/382868 )
Change subject: Chain the purgers together.
......................................................................
Chain the purgers together.
This completes the ideas that began in the earlier "Remove
multi-head support from strq" commit. It hooks up the
commandline-specified purgers in a chain, such that the URLs
received via HTCP flow like so:
HTCP -> Receiver (parse, generate HTTP PURGE req)
-> Purger0_Queue -> Purge0_Varnish (confirm response)
-> Purger1_Queue -> Purge1_Varnish (confirm response)
-> Purger2_Queue ...
This ensures purging happens serially according to the order the
purge addresses are specified on the commandline, with one purger
claiming completion over HTTP before the next even gets handed the
request to enqueue for itself, at the cost of multiple queues and
slight delays.
Note that the statistics still assume a single queue and are now
doubled-up in places. To fix shortly...
Change-Id: I0f03a07ddc89f11f6322d4f20ece6b9b7c87b785
---
M src/main.c
M src/purger.c
M src/purger.h
3 files changed, 18 insertions(+), 7 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/operations/software/varnish/vhtcpd
refs/changes/68/382868/1
diff --git a/src/main.c b/src/main.c
index f1b0a83..6506725 100644
--- a/src/main.c
+++ b/src/main.c
@@ -399,10 +399,17 @@
// stats has a timeout callback for reporting
stats_init(loop, cfg->statsfile);
- // set up an array of purger objects
+ // set up an array of purger objects, and initialize them in reverse
+ // order so we can set up the purger->purger queueing chain
purgers = malloc(cfg->num_purgers * sizeof(purger_t*));
- for(unsigned i = 0; i < cfg->num_purgers; i++)
- purgers[i] = purger_new(loop, &cfg->purger_addrs[i],
cfg->max_queue_mb, cfg->io_timeout, cfg->idle_timeout);
+ {
+ purger_t* next_purger = NULL;
+ unsigned i = cfg->num_purgers - 1;
+ do {
+ purgers[i] = purger_new(loop, &cfg->purger_addrs[i], next_purger,
cfg->max_queue_mb, cfg->io_timeout, cfg->idle_timeout);
+ next_purger = purgers[i];
+ } while(i--);
+ }
// set up the singular receiver, with purger[0] as the dequeur
receiver_t* receiver = receiver_new(
diff --git a/src/purger.c b/src/purger.c
index 4e8f0b5..60a2bf1 100644
--- a/src/purger.c
+++ b/src/purger.c
@@ -151,6 +151,7 @@
char* outbuf;
char* inbuf;
strq_t* queue;
+ purger_t* next_purger;
struct ev_loop* loop;
ev_io* write_watcher;
ev_io* read_watcher;
@@ -562,10 +563,12 @@
// XXX pr.status_ok will just be for stats?
dmn_log_debug("purger: %s/%s -> purger_read_cb silent result:
successful response parsed", dmn_logf_anysin(&s->daddr), state_strs[s->state]);
- // reset i/o progress + inc connection reqs
- s->outbuf_bytes = s->outbuf_written = s->inbuf_parsed = 0;
+ // copy to next purger + inc counters + reset i/o progress
+ if(s->next_purger)
+ purger_enqueue(s->next_purger, s->outbuf, s->outbuf_bytes);
s->fd_reqs++;
stats.inpkts_sent++;
+ s->outbuf_bytes = s->outbuf_written = s->inbuf_parsed = 0;
// no matter which path, current timer needs to go
ev_timer_stop(s->loop, s->timeout_watcher);
@@ -648,7 +651,7 @@
}
}
-purger_t* purger_new(struct ev_loop* loop, const dmn_anysin_t* daddr, unsigned
max_mb, unsigned io_timeout, unsigned idle_timeout) {
+purger_t* purger_new(struct ev_loop* loop, const dmn_anysin_t* daddr,
purger_t* next_purger, unsigned max_mb, unsigned io_timeout, unsigned
idle_timeout) {
purger_t* s = calloc(1, sizeof(purger_t));
s->fd = -1;
s->inbuf_size = INBUF_INITSIZE;
@@ -656,6 +659,7 @@
s->inbuf = malloc(s->inbuf_size);
s->parser = malloc(sizeof(http_parser));
s->queue = strq_new(loop, max_mb);
+ s->next_purger = next_purger;
s->loop = loop;
s->io_timeout = io_timeout;
s->idle_timeout = idle_timeout;
diff --git a/src/purger.h b/src/purger.h
index 6084730..9966187 100644
--- a/src/purger.h
+++ b/src/purger.h
@@ -13,7 +13,7 @@
typedef struct purger purger_t;
// Sender does not own the loop, the caller does.
-purger_t* purger_new(struct ev_loop* loop, const dmn_anysin_t* daddr, unsigned
max_mb, unsigned io_timeout, unsigned idle_timeout);
+purger_t* purger_new(struct ev_loop* loop, const dmn_anysin_t* daddr,
purger_t* next_purger, unsigned max_mb, unsigned io_timeout, unsigned
idle_timeout);
void purger_enqueue(purger_t* s, const char* req, const unsigned req_len);
void purger_destroy(purger_t* s);
--
To view, visit https://gerrit.wikimedia.org/r/382868
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0f03a07ddc89f11f6322d4f20ece6b9b7c87b785
Gerrit-PatchSet: 1
Gerrit-Project: operations/software/varnish/vhtcpd
Gerrit-Branch: master
Gerrit-Owner: BBlack <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits