Maciej, I wrote this ugly patch to try to crash as soon as possible when a corrupt h2s->subs is detected. The patch was written for 2.2. I only instrumented roughly 30 places in process_stream() which is a fairly likely candidate. I just hope it happens within the context of the stream itself otherwise it will become really painful.
You can apply this patch on top of your existing changes. It will try to detect the presence of a non-zero lowest bit in the subs pointer (which should never happen). If we're lucky it will crash inside process_stream() between two points and we'll be able to narrow it down. If we're unlucky it will crash when entering it and that will not be fun. If you want to play with it, you can apply TEST_SI() on stream_interface pointers (often called "si"), TEST_STRM() on stream pointers, and TEST_CS() on conn_stream pointers (often called "cs"). Please just let me know how it goes. Note, I tested it, it passes all regtests for me so I'm reasonably confident it should not crash by accident. But I can't be sure, I'm just using heuristics, so please do not put it in sensitive production! Thanks, Willy
>From b7638769b3ee38a23bf319df5338c0ba46d9f57e Mon Sep 17 00:00:00 2001 From: Willy Tarreau <w...@1wt.eu> Date: Fri, 6 Nov 2020 19:54:01 +0100 Subject: EXP: try to spot where h2s->subs changes --- include/haproxy/bug.h | 7 +++++++ src/mux_h2.c | 22 +++++++++++++++++++++ src/stream.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+) diff --git a/include/haproxy/bug.h b/include/haproxy/bug.h index a008126..c650f60 100644 --- a/include/haproxy/bug.h +++ b/include/haproxy/bug.h @@ -166,6 +166,13 @@ struct mem_stats { }) #endif /* DEBUG_MEM_STATS*/ + +#define TEST_CS(ptr) do { extern void testcorrupt(const void *); testcorrupt(ptr); } while (0) + +#define TEST_SI(si) do { if ((si)) TEST_CS((si)->end); } while (0) + +#define TEST_STRM(s) do { if ((s)) { TEST_SI(&(s)->si[0]); TEST_SI(&(s)->si[1]);} } while (0) + #endif /* _HAPROXY_BUG_H */ /* diff --git a/src/mux_h2.c b/src/mux_h2.c index 5830fdb..6b5a649 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -6251,3 +6251,25 @@ static int init_h2() } REGISTER_POST_CHECK(init_h2); + +void testcorrupt(void *ptr) +{ + const struct conn_stream *cs = objt_cs(ptr); + const struct h2s *h2s; + + if (!cs) + return; + + h2s = cs->ctx; + if (!h2s) + return; + + if (h2s->cs != cs) + return; + + if (!h2s->h2c || !h2s->h2c->conn || h2s->h2c->conn->mux != &h2_ops) + return; + + if ((long)h2s->subs & 1) + ABORT_NOW(); +} diff --git a/src/stream.c b/src/stream.c index 43f1432..6646d1a 100644 --- a/src/stream.c +++ b/src/stream.c @@ -531,6 +531,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) * the caller must handle the task_wakeup */ DBG_TRACE_LEAVE(STRM_EV_STRM_NEW, s); + TEST_STRM(s); return s; /* Error unrolling */ @@ -542,6 +543,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) out_fail_alloc_si1: tasklet_free(s->si[0].wait_event.tasklet); out_fail_alloc: + TEST_STRM(s); pool_free(pool_head_stream, s); DBG_TRACE_DEVEL("leaving on error", STRM_EV_STRM_NEW|STRM_EV_STRM_ERR); return NULL; @@ -1497,6 +1499,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) struct stream_interface *si_f, *si_b; unsigned int rate; + TEST_STRM(s); + DBG_TRACE_ENTER(STRM_EV_STRM_PROC, s); activity[tid].stream_calls++; @@ -1594,6 +1598,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) } resync_stream_interface: + TEST_STRM(s); + /* below we may emit error messages so we have to ensure that we have * our buffers properly allocated. */ @@ -1658,6 +1664,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) /* note: maybe we should process connection errors here ? */ } + TEST_STRM(s); + if (si_state_in(si_b->state, SI_SB_CON|SI_SB_RDY)) { /* we were trying to establish a connection on the server side, * maybe it succeeded, maybe it failed, maybe we timed out, ... @@ -1677,6 +1685,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) * SI_ST_ASS/SI_ST_TAR/SI_ST_REQ for retryable errors. */ } + TEST_STRM(s); + rq_prod_last = si_f->state; rq_cons_last = si_b->state; @@ -1707,12 +1717,16 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) } } + TEST_STRM(s); + /* * Note: of the transient states (REQ, CER, DIS), only REQ may remain * at this point. */ resync_request: + TEST_STRM(s); + /* Analyse request */ if (((req->flags & ~rqf_last) & CF_MASK_ANALYSER) || ((req->flags ^ rqf_last) & CF_MASK_STATIC) || @@ -1811,8 +1825,12 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) * analysers (eg: HTTP keep-alive). */ req_ana_back = req->analysers; + TEST_STRM(s); + resync_response: + TEST_STRM(s); + /* Analyse response */ if (((res->flags & ~rpf_last) & CF_MASK_ANALYSER) || @@ -1875,6 +1893,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) goto resync_response; } + TEST_STRM(s); + /* maybe someone has added some request analysers, so we must check and loop */ if (req->analysers & ~req_ana_back) goto resync_request; @@ -1886,6 +1906,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) * both buffers. */ + TEST_STRM(s); + /* * Now we propagate unhandled errors to the stream. Normally @@ -1990,6 +2012,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) sess_set_term_flags(s); } } + TEST_STRM(s); + /* * Here we take care of forwarding unhandled data. This also includes @@ -2034,6 +2058,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) } } + TEST_STRM(s); + /* check if it is wise to enable kernel splicing to forward request data */ if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) && req->to_forward && @@ -2081,6 +2107,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) } + TEST_STRM(s); + /* we may have a pending connection request, or a connection waiting * for completion. */ @@ -2120,6 +2148,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) /* Let's see if we can send the pending request now */ si_sync_send(si_b); + TEST_STRM(s); + /* * Now forward all shutdown requests between both sides of the request buffer @@ -2154,6 +2184,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) si_f->flags |= SI_FL_NOLINGER; si_shutr(si_f); } + TEST_STRM(s); + /* Benchmarks have shown that it's optimal to do a full resync now */ if (si_f->state == SI_ST_DIS || @@ -2168,6 +2200,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) /* perform output updates to the response buffer */ + TEST_STRM(s); + /* If noone is interested in analysing data, it's time to forward * everything. We configure the buffer to forward indefinitely. * Note that we're checking CF_SHUTR_NOW as an indication of a possible @@ -2227,6 +2261,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) } } + TEST_STRM(s); + /* check if it is wise to enable kernel splicing to forward response data */ if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) && res->to_forward && @@ -2242,12 +2278,16 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) res->flags |= CF_KERN_SPLICING; } + TEST_STRM(s); + /* reflect what the L7 analysers have seen last */ rpf_last = res->flags; /* Let's see if we can send the pending response now */ si_sync_send(si_f); + TEST_STRM(s); + /* * Now forward all shutdown requests between both sides of the buffer */ @@ -2280,6 +2320,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) si_shutr(si_b); } + TEST_STRM(s); + if (si_f->state == SI_ST_DIS || si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) || (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) || @@ -2295,6 +2337,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER) goto resync_request; + TEST_STRM(s); + /* we're interested in getting wakeups again */ si_f->flags &= ~SI_FL_DONT_WAKE; si_b->flags &= ~SI_FL_DONT_WAKE; @@ -2326,6 +2370,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) } } + TEST_STRM(s); + if (likely((si_f->state != SI_ST_CLO) || !si_state_in(si_b->state, SI_SB_INI|SI_SB_CLO))) { if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE)) stream_process_counters(s); @@ -2349,7 +2395,10 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) /* Reset pending events now */ s->pending_events = 0; + update_exp_and_leave: + TEST_STRM(s); + /* Note: please ensure that if you branch here you disable SI_FL_DONT_WAKE */ t->expire = tick_first((tick_is_expired(t->expire, now_ms) ? 0 : t->expire), tick_first(tick_first(req->rex, req->wex), @@ -2373,6 +2422,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES); stream_release_buffers(s); + TEST_STRM(s); DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s); return t; /* nothing more to do */ @@ -2392,10 +2442,12 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) DISGUISE(write(1, trash.area, trash.data)); } + TEST_STRM(s); s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now); if (!(s->flags & SF_IGNORE)) stream_process_counters(s); + TEST_STRM(s); if (s->txn && s->txn->status) { int n; @@ -2413,6 +2465,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) } } + TEST_STRM(s); /* let's do a final log if we need it */ if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait && !(s->flags & SF_MONITOR) && @@ -2426,6 +2479,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) stream_update_time_stats(s); /* the task MUST not be in the run queue anymore */ + TEST_STRM(s); stream_free(s); task_destroy(t); return NULL; -- 2.9.0