Maciej,

I wrote this ugly patch to try to crash as soon as possible when a corrupt
h2s->subs is detected. The patch was written for 2.2. I only instrumented
roughly 30 places in process_stream() which is a fairly likely candidate.
I just hope it happens within the context of the stream itself otherwise
it will become really painful.

You can apply this patch on top of your existing changes. It will try to
detect the presence of a non-zero lowest bit in the subs pointer (which
should never happen). If we're lucky it will crash inside process_stream()
between two points and we'll be able to narrow it down. If we're unlucky
it will crash when entering it and that will not be fun.

If you want to play with it, you can apply TEST_SI() on stream_interface
pointers (often called "si"), TEST_STRM() on stream pointers, and TEST_CS()
on conn_stream pointers (often called "cs").

Please just let me know how it goes. Note, I tested it, it passes all
regtests for me so I'm reasonably confident it should not crash by
accident. But I can't be sure, I'm just using heuristics, so please do
not put it in sensitive production!

Thanks,
Willy
>From b7638769b3ee38a23bf319df5338c0ba46d9f57e Mon Sep 17 00:00:00 2001
From: Willy Tarreau <w...@1wt.eu>
Date: Fri, 6 Nov 2020 19:54:01 +0100
Subject: EXP: try to spot where h2s->subs changes

---
 include/haproxy/bug.h |  7 +++++++
 src/mux_h2.c          | 22 +++++++++++++++++++++
 src/stream.c          | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 83 insertions(+)

diff --git a/include/haproxy/bug.h b/include/haproxy/bug.h
index a008126..c650f60 100644
--- a/include/haproxy/bug.h
+++ b/include/haproxy/bug.h
@@ -166,6 +166,13 @@ struct mem_stats {
 })
 #endif /* DEBUG_MEM_STATS*/
 
+
+#define TEST_CS(ptr) do { extern void testcorrupt(const void *); 
testcorrupt(ptr); } while (0)
+
+#define TEST_SI(si) do { if ((si)) TEST_CS((si)->end); } while (0)
+
+#define TEST_STRM(s) do { if ((s)) { TEST_SI(&(s)->si[0]); 
TEST_SI(&(s)->si[1]);} } while (0)
+
 #endif /* _HAPROXY_BUG_H */
 
 /*
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 5830fdb..6b5a649 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -6251,3 +6251,25 @@ static int init_h2()
 }
 
 REGISTER_POST_CHECK(init_h2);
+
+void testcorrupt(void *ptr)
+{
+       const struct conn_stream *cs = objt_cs(ptr);
+       const struct h2s *h2s;
+
+       if (!cs)
+               return;
+
+       h2s = cs->ctx;
+       if (!h2s)
+               return;
+
+       if (h2s->cs != cs)
+               return;
+
+       if (!h2s->h2c || !h2s->h2c->conn || h2s->h2c->conn->mux != &h2_ops)
+               return;
+
+       if ((long)h2s->subs & 1)
+               ABORT_NOW();
+}
diff --git a/src/stream.c b/src/stream.c
index 43f1432..6646d1a 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -531,6 +531,7 @@ struct stream *stream_new(struct session *sess, enum 
obj_type *origin)
         * the caller must handle the task_wakeup
         */
        DBG_TRACE_LEAVE(STRM_EV_STRM_NEW, s);
+       TEST_STRM(s);
        return s;
 
        /* Error unrolling */
@@ -542,6 +543,7 @@ struct stream *stream_new(struct session *sess, enum 
obj_type *origin)
 out_fail_alloc_si1:
        tasklet_free(s->si[0].wait_event.tasklet);
  out_fail_alloc:
+       TEST_STRM(s);
        pool_free(pool_head_stream, s);
        DBG_TRACE_DEVEL("leaving on error", STRM_EV_STRM_NEW|STRM_EV_STRM_ERR);
        return NULL;
@@ -1497,6 +1499,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
        struct stream_interface *si_f, *si_b;
        unsigned int rate;
 
+       TEST_STRM(s);
+
        DBG_TRACE_ENTER(STRM_EV_STRM_PROC, s);
 
        activity[tid].stream_calls++;
@@ -1594,6 +1598,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
        }
 
  resync_stream_interface:
+       TEST_STRM(s);
+
        /* below we may emit error messages so we have to ensure that we have
         * our buffers properly allocated.
         */
@@ -1658,6 +1664,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                /* note: maybe we should process connection errors here ? */
        }
 
+       TEST_STRM(s);
+
        if (si_state_in(si_b->state, SI_SB_CON|SI_SB_RDY)) {
                /* we were trying to establish a connection on the server side,
                 * maybe it succeeded, maybe it failed, maybe we timed out, ...
@@ -1677,6 +1685,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                 * SI_ST_ASS/SI_ST_TAR/SI_ST_REQ for retryable errors.
                 */
        }
+       TEST_STRM(s);
+
 
        rq_prod_last = si_f->state;
        rq_cons_last = si_b->state;
@@ -1707,12 +1717,16 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                }
        }
 
+       TEST_STRM(s);
+
        /*
         * Note: of the transient states (REQ, CER, DIS), only REQ may remain
         * at this point.
         */
 
  resync_request:
+       TEST_STRM(s);
+
        /* Analyse request */
        if (((req->flags & ~rqf_last) & CF_MASK_ANALYSER) ||
            ((req->flags ^ rqf_last) & CF_MASK_STATIC) ||
@@ -1811,8 +1825,12 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
         * analysers (eg: HTTP keep-alive).
         */
        req_ana_back = req->analysers;
+       TEST_STRM(s);
+
 
  resync_response:
+       TEST_STRM(s);
+
        /* Analyse response */
 
        if (((res->flags & ~rpf_last) & CF_MASK_ANALYSER) ||
@@ -1875,6 +1893,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                        goto resync_response;
        }
 
+       TEST_STRM(s);
+
        /* maybe someone has added some request analysers, so we must check and 
loop */
        if (req->analysers & ~req_ana_back)
                goto resync_request;
@@ -1886,6 +1906,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
         * both buffers.
         */
 
+       TEST_STRM(s);
+
 
        /*
         * Now we propagate unhandled errors to the stream. Normally
@@ -1990,6 +2012,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                        sess_set_term_flags(s);
                }
        }
+       TEST_STRM(s);
+
 
        /*
         * Here we take care of forwarding unhandled data. This also includes
@@ -2034,6 +2058,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                }
        }
 
+       TEST_STRM(s);
+
        /* check if it is wise to enable kernel splicing to forward request 
data */
        if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
            req->to_forward &&
@@ -2081,6 +2107,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
        }
 
 
+       TEST_STRM(s);
+
        /* we may have a pending connection request, or a connection waiting
         * for completion.
         */
@@ -2120,6 +2148,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
 
        /* Let's see if we can send the pending request now */
        si_sync_send(si_b);
+       TEST_STRM(s);
+
 
        /*
         * Now forward all shutdown requests between both sides of the request 
buffer
@@ -2154,6 +2184,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                        si_f->flags |= SI_FL_NOLINGER;
                si_shutr(si_f);
        }
+       TEST_STRM(s);
+
 
        /* Benchmarks have shown that it's optimal to do a full resync now */
        if (si_f->state == SI_ST_DIS ||
@@ -2168,6 +2200,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
 
        /* perform output updates to the response buffer */
 
+       TEST_STRM(s);
+
        /* If noone is interested in analysing data, it's time to forward
         * everything. We configure the buffer to forward indefinitely.
         * Note that we're checking CF_SHUTR_NOW as an indication of a possible
@@ -2227,6 +2261,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                }
        }
 
+       TEST_STRM(s);
+
        /* check if it is wise to enable kernel splicing to forward response 
data */
        if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
            res->to_forward &&
@@ -2242,12 +2278,16 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                res->flags |= CF_KERN_SPLICING;
        }
 
+       TEST_STRM(s);
+
        /* reflect what the L7 analysers have seen last */
        rpf_last = res->flags;
 
        /* Let's see if we can send the pending response now */
        si_sync_send(si_f);
 
+       TEST_STRM(s);
+
        /*
         * Now forward all shutdown requests between both sides of the buffer
         */
@@ -2280,6 +2320,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                si_shutr(si_b);
        }
 
+       TEST_STRM(s);
+
        if (si_f->state == SI_ST_DIS ||
            si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) ||
            (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) ||
@@ -2295,6 +2337,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
        if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & 
CF_MASK_ANALYSER)
                goto resync_request;
 
+       TEST_STRM(s);
+
        /* we're interested in getting wakeups again */
        si_f->flags &= ~SI_FL_DONT_WAKE;
        si_b->flags &= ~SI_FL_DONT_WAKE;
@@ -2326,6 +2370,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                }
        }
 
+       TEST_STRM(s);
+
        if (likely((si_f->state != SI_ST_CLO) || !si_state_in(si_b->state, 
SI_SB_INI|SI_SB_CLO))) {
                if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & 
SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE))
                        stream_process_counters(s);
@@ -2349,7 +2395,10 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                /* Reset pending events now */
                s->pending_events = 0;
 
+
        update_exp_and_leave:
+       TEST_STRM(s);
+
                /* Note: please ensure that if you branch here you disable 
SI_FL_DONT_WAKE */
                t->expire = tick_first((tick_is_expired(t->expire, now_ms) ? 0 
: t->expire),
                                       tick_first(tick_first(req->rex, 
req->wex),
@@ -2373,6 +2422,7 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
 
                s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
                stream_release_buffers(s);
+       TEST_STRM(s);
 
                DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s);
                return t; /* nothing more to do */
@@ -2392,10 +2442,12 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                DISGUISE(write(1, trash.area, trash.data));
        }
 
+       TEST_STRM(s);
        s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
        if (!(s->flags & SF_IGNORE))
                stream_process_counters(s);
 
+       TEST_STRM(s);
        if (s->txn && s->txn->status) {
                int n;
 
@@ -2413,6 +2465,7 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
                }
        }
 
+       TEST_STRM(s);
        /* let's do a final log if we need it */
        if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait &&
            !(s->flags & SF_MONITOR) &&
@@ -2426,6 +2479,7 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
        stream_update_time_stats(s);
 
        /* the task MUST not be in the run queue anymore */
+       TEST_STRM(s);
        stream_free(s);
        task_destroy(t);
        return NULL;
-- 
2.9.0

Reply via email to