Le 09/11/2020 à 13:10, Maciej Zdeb a écrit :
I've played little bit with the patch and it led me to backend.c file and
connect_server() function
int connect_server(struct stream *s)
{
[...]
if (!conn_xprt_ready(srv_conn) && !srv_conn->mux) {
/* set the correct protocol on the output stream interface */
if (srv)
conn_prepare(srv_conn,
protocol_by_family(srv_conn->dst->ss_family), srv->xprt);
else if (obj_type(s->target) == OBJ_TYPE_PROXY) {
/* proxies exclusively run on raw_sock right now */
conn_prepare(srv_conn,
protocol_by_family(srv_conn->dst->ss_family), xprt_get(XPRT_RAW));
if (!(srv_conn->ctrl)) {
conn_free(srv_conn);
return SF_ERR_INTERNAL;
}
}
else {
conn_free(srv_conn);
return SF_ERR_INTERNAL; /* how did we get there ? */
}
// THIS ONE IS OK
TEST_STRM(s);
//////////////////////////////
srv_cs = si_alloc_cs(&s->si[1], srv_conn);
// FAIL
TEST_STRM(s);
//////////////////////////////
if (!srv_cs) {
conn_free(srv_conn);
return SF_ERR_RESOURCE;
}
Hi,
In fact, this crash occurs because of the Willy's patch. It was not design to
handle non-h2 connections. Here the crash happens on a TCP connection, used by a
SPOE applet for instance.
I updated its patch. First, I added some calls to TEST_STRM() in the SPOE code,
to be sure. I also explicitly set the stream task to NULL in stream_free() to
catch late wakeups in the SPOE. Finally, I modified testcorrupt(). I hope this
one is correct. But if I missed something, you may only keep the last
ABORT_NOW() in testcorrupt() and replace others by a return statement, just like
in the Willy's patch.
--
Christopher Faulet
>From ba99e0eedf1730970f1d0b5bb67f24ef79117207 Mon Sep 17 00:00:00 2001
From: Christopher Faulet <cfau...@haproxy.com>
Date: Mon, 9 Nov 2020 14:37:57 +0100
Subject: [PATCH] EXP: try to spot where h2s->subs changes
---
include/haproxy/bug.h | 7 ++++++
src/flt_spoe.c | 8 +++++++
src/mux_h2.c | 25 ++++++++++++++++++++
src/stream.c | 55 +++++++++++++++++++++++++++++++++++++++++++
4 files changed, 95 insertions(+)
diff --git a/include/haproxy/bug.h b/include/haproxy/bug.h
index a008126f5c..c650f60b8c 100644
--- a/include/haproxy/bug.h
+++ b/include/haproxy/bug.h
@@ -166,6 +166,13 @@ struct mem_stats {
})
#endif /* DEBUG_MEM_STATS*/
+
+#define TEST_CS(ptr) do { extern void testcorrupt(const void *); testcorrupt(ptr); } while (0)
+
+#define TEST_SI(si) do { if ((si)) TEST_CS((si)->end); } while (0)
+
+#define TEST_STRM(s) do { if ((s)) { TEST_SI(&(s)->si[0]); TEST_SI(&(s)->si[1]);} } while (0)
+
#endif /* _HAPROXY_BUG_H */
/*
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index cf5fc7a4c0..6899b16e66 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1255,6 +1255,7 @@ spoe_release_appctx(struct appctx *appctx)
spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
+ TEST_STRM(ctx->strm);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
@@ -1265,6 +1266,7 @@ spoe_release_appctx(struct appctx *appctx)
ctx->spoe_appctx = NULL;
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
+ TEST_STRM(ctx->strm);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
@@ -1279,6 +1281,7 @@ spoe_release_appctx(struct appctx *appctx)
spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
+ TEST_STRM(ctx->strm);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
@@ -1288,6 +1291,7 @@ spoe_release_appctx(struct appctx *appctx)
spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (spoe_appctx->status_code + 0x100);
+ TEST_STRM(ctx->strm);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
}
@@ -1491,6 +1495,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
ctx->spoe_appctx = NULL;
ctx->state = SPOE_CTX_ST_ERROR;
ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+ TEST_STRM(ctx->strm);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
*skip = 1;
break;
@@ -1524,6 +1529,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
+ TEST_STRM(ctx->strm);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
goto end;
@@ -1613,6 +1619,7 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
}
else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ TEST_STRM(ctx->strm);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
break;
}
@@ -2830,6 +2837,7 @@ spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait)
static int
spoe_wakeup_context(struct spoe_context *ctx)
{
+ TEST_STRM(ctx->strm);
task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
return 1;
}
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 5830fdbd90..a0d31911a3 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -6251,3 +6251,28 @@ static int init_h2()
}
REGISTER_POST_CHECK(init_h2);
+
+void testcorrupt(void *ptr)
+{
+ const struct conn_stream *cs = objt_cs(ptr);
+ const struct h2s *h2s;
+
+ if (!cs || cs->conn->mux != &h2_ops)
+ return;
+
+ /* mux installed, thus the conn-stream ctx must be a h2s */
+ h2s = cs->ctx;
+ if (!h2s)
+ ABORT_NOW();
+
+ /* The h2s must point on this conn-stream */
+ if (h2s->cs != cs)
+ ABORT_NOW();
+
+ /* The h2s must have a h2c, the same than the cs one */
+ if (!h2s->h2c || !h2s->h2c->conn || h2s->h2c->conn != cs->conn)
+ ABORT_NOW();
+
+ if ((long)h2s->subs & 1)
+ ABORT_NOW();
+}
diff --git a/src/stream.c b/src/stream.c
index 43f1432979..931c8abd95 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -531,6 +531,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
* the caller must handle the task_wakeup
*/
DBG_TRACE_LEAVE(STRM_EV_STRM_NEW, s);
+ TEST_STRM(s);
return s;
/* Error unrolling */
@@ -542,6 +543,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
out_fail_alloc_si1:
tasklet_free(s->si[0].wait_event.tasklet);
out_fail_alloc:
+ TEST_STRM(s);
pool_free(pool_head_stream, s);
DBG_TRACE_DEVEL("leaving on error", STRM_EV_STRM_NEW|STRM_EV_STRM_ERR);
return NULL;
@@ -565,6 +567,7 @@ static void stream_free(struct stream *s)
* that walking over a task list never exhibits a dying stream.
*/
s->task->context = NULL;
+ s->task = NULL;
__ha_barrier_store();
pendconn_free(s);
@@ -1497,6 +1500,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
struct stream_interface *si_f, *si_b;
unsigned int rate;
+ TEST_STRM(s);
+
DBG_TRACE_ENTER(STRM_EV_STRM_PROC, s);
activity[tid].stream_calls++;
@@ -1594,6 +1599,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
}
resync_stream_interface:
+ TEST_STRM(s);
+
/* below we may emit error messages so we have to ensure that we have
* our buffers properly allocated.
*/
@@ -1658,6 +1665,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
/* note: maybe we should process connection errors here ? */
}
+ TEST_STRM(s);
+
if (si_state_in(si_b->state, SI_SB_CON|SI_SB_RDY)) {
/* we were trying to establish a connection on the server side,
* maybe it succeeded, maybe it failed, maybe we timed out, ...
@@ -1677,6 +1686,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
* SI_ST_ASS/SI_ST_TAR/SI_ST_REQ for retryable errors.
*/
}
+ TEST_STRM(s);
+
rq_prod_last = si_f->state;
rq_cons_last = si_b->state;
@@ -1707,12 +1718,16 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
}
}
+ TEST_STRM(s);
+
/*
* Note: of the transient states (REQ, CER, DIS), only REQ may remain
* at this point.
*/
resync_request:
+ TEST_STRM(s);
+
/* Analyse request */
if (((req->flags & ~rqf_last) & CF_MASK_ANALYSER) ||
((req->flags ^ rqf_last) & CF_MASK_STATIC) ||
@@ -1811,8 +1826,12 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
* analysers (eg: HTTP keep-alive).
*/
req_ana_back = req->analysers;
+ TEST_STRM(s);
+
resync_response:
+ TEST_STRM(s);
+
/* Analyse response */
if (((res->flags & ~rpf_last) & CF_MASK_ANALYSER) ||
@@ -1875,6 +1894,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
goto resync_response;
}
+ TEST_STRM(s);
+
/* maybe someone has added some request analysers, so we must check and loop */
if (req->analysers & ~req_ana_back)
goto resync_request;
@@ -1886,6 +1907,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
* both buffers.
*/
+ TEST_STRM(s);
+
/*
* Now we propagate unhandled errors to the stream. Normally
@@ -1990,6 +2013,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
sess_set_term_flags(s);
}
}
+ TEST_STRM(s);
+
/*
* Here we take care of forwarding unhandled data. This also includes
@@ -2034,6 +2059,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
}
}
+ TEST_STRM(s);
+
/* check if it is wise to enable kernel splicing to forward request data */
if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
req->to_forward &&
@@ -2081,6 +2108,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
}
+ TEST_STRM(s);
+
/* we may have a pending connection request, or a connection waiting
* for completion.
*/
@@ -2120,6 +2149,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
/* Let's see if we can send the pending request now */
si_sync_send(si_b);
+ TEST_STRM(s);
+
/*
* Now forward all shutdown requests between both sides of the request buffer
@@ -2154,6 +2185,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
si_f->flags |= SI_FL_NOLINGER;
si_shutr(si_f);
}
+ TEST_STRM(s);
+
/* Benchmarks have shown that it's optimal to do a full resync now */
if (si_f->state == SI_ST_DIS ||
@@ -2168,6 +2201,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
/* perform output updates to the response buffer */
+ TEST_STRM(s);
+
/* If noone is interested in analysing data, it's time to forward
* everything. We configure the buffer to forward indefinitely.
* Note that we're checking CF_SHUTR_NOW as an indication of a possible
@@ -2227,6 +2262,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
}
}
+ TEST_STRM(s);
+
/* check if it is wise to enable kernel splicing to forward response data */
if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
res->to_forward &&
@@ -2242,12 +2279,16 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
res->flags |= CF_KERN_SPLICING;
}
+ TEST_STRM(s);
+
/* reflect what the L7 analysers have seen last */
rpf_last = res->flags;
/* Let's see if we can send the pending response now */
si_sync_send(si_f);
+ TEST_STRM(s);
+
/*
* Now forward all shutdown requests between both sides of the buffer
*/
@@ -2280,6 +2321,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
si_shutr(si_b);
}
+ TEST_STRM(s);
+
if (si_f->state == SI_ST_DIS ||
si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) ||
(si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) ||
@@ -2295,6 +2338,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)
goto resync_request;
+ TEST_STRM(s);
+
/* we're interested in getting wakeups again */
si_f->flags &= ~SI_FL_DONT_WAKE;
si_b->flags &= ~SI_FL_DONT_WAKE;
@@ -2326,6 +2371,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
}
}
+ TEST_STRM(s);
+
if (likely((si_f->state != SI_ST_CLO) || !si_state_in(si_b->state, SI_SB_INI|SI_SB_CLO))) {
if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE))
stream_process_counters(s);
@@ -2349,7 +2396,10 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
/* Reset pending events now */
s->pending_events = 0;
+
update_exp_and_leave:
+ TEST_STRM(s);
+
/* Note: please ensure that if you branch here you disable SI_FL_DONT_WAKE */
t->expire = tick_first((tick_is_expired(t->expire, now_ms) ? 0 : t->expire),
tick_first(tick_first(req->rex, req->wex),
@@ -2373,6 +2423,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
stream_release_buffers(s);
+ TEST_STRM(s);
DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s);
return t; /* nothing more to do */
@@ -2392,10 +2443,12 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
DISGUISE(write(1, trash.area, trash.data));
}
+ TEST_STRM(s);
s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
if (!(s->flags & SF_IGNORE))
stream_process_counters(s);
+ TEST_STRM(s);
if (s->txn && s->txn->status) {
int n;
@@ -2413,6 +2466,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
}
}
+ TEST_STRM(s);
/* let's do a final log if we need it */
if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait &&
!(s->flags & SF_MONITOR) &&
@@ -2426,6 +2480,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
stream_update_time_stats(s);
/* the task MUST not be in the run queue anymore */
+ TEST_STRM(s);
stream_free(s);
task_destroy(t);
return NULL;
--
2.26.2