The following commit will introduce asynchronous commands. Let's keep the session aware of the pending commands, so we can do interesting things like order the replies, or cancel pending operations when the client is gone.
The queue needs a lock, since QmpReturn may be called from any thread. Signed-off-by: Marc-André Lureau <marcandre.lur...@redhat.com> --- include/qapi/qmp/dispatch.h | 4 ++++ qapi/qmp-dispatch.c | 32 ++++++++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h index 6c0d21968e..7c9de9780d 100644 --- a/include/qapi/qmp/dispatch.h +++ b/include/qapi/qmp/dispatch.h @@ -16,6 +16,7 @@ #include "qemu/queue.h" #include "qapi/qmp/json-parser.h" +#include "qemu/thread.h" typedef struct QmpReturn QmpReturn; @@ -47,11 +48,14 @@ struct QmpSession { const QmpCommandList *cmds; JSONMessageParser parser; QmpDispatchReturn *return_cb; + QemuMutex pending_lock; + QTAILQ_HEAD(, QmpReturn) pending; }; struct QmpReturn { QmpSession *session; QDict *rsp; + QTAILQ_ENTRY(QmpReturn) entry; }; /** diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c index fcf6cb0bf8..aed5c91057 100644 --- a/qapi/qmp-dispatch.c +++ b/qapi/qmp-dispatch.c @@ -32,11 +32,24 @@ QmpReturn *qmp_return_new(QmpSession *session, const QObject *request) qdict_put_obj(qret->rsp, "id", id); } + qemu_mutex_lock(&session->pending_lock); + QTAILQ_INSERT_TAIL(&session->pending, qret, entry); + qemu_mutex_unlock(&session->pending_lock); + return qret; } void qmp_return_free(QmpReturn *qret) { + QmpSession *session = qret->session; + + if (session) { + qemu_mutex_lock(&session->pending_lock); + } + QTAILQ_REMOVE(&session->pending, qret, entry); + if (session) { + qemu_mutex_unlock(&session->pending_lock); + } qobject_unref(qret->rsp); g_free(qret); } @@ -44,7 +57,9 @@ void qmp_return_free(QmpReturn *qret) void qmp_return(QmpReturn *qret, QObject *rsp) { qdict_put_obj(qret->rsp, "return", rsp ?: QOBJECT(qdict_new())); - qret->session->return_cb(qret->session, qret->rsp); + if (qret->session) { + qret->session->return_cb(qret->session, qret->rsp); + } qmp_return_free(qret); } @@ -55,7 +70,9 @@ void qmp_return_error(QmpReturn *qret, Error *err) qdict_put_str(qdict, "desc", error_get_pretty(err)); qdict_put_obj(qret->rsp, "error", QOBJECT(qdict)); error_free(err); - qret->session->return_cb(qret->session, qret->rsp); + if (qret->session) { + qret->session->return_cb(qret->session, qret->rsp); + } qmp_return_free(qret); } @@ -220,17 +237,28 @@ void qmp_session_init(QmpSession *session, session, NULL); session->cmds = cmds; session->return_cb = return_cb; + qemu_mutex_init(&session->pending_lock); + QTAILQ_INIT(&session->pending); } void qmp_session_destroy(QmpSession *session) { + QmpReturn *ret, *next; + if (!session->return_cb) { return; } + qemu_mutex_lock(&session->pending_lock); + QTAILQ_FOREACH_SAFE(ret, &session->pending, entry, next) { + ret->session = NULL; + QTAILQ_REMOVE(&session->pending, ret, entry); + } + qemu_mutex_unlock(&session->pending_lock); session->cmds = NULL; session->return_cb = NULL; json_message_parser_destroy(&session->parser); + qemu_mutex_destroy(&session->pending_lock); } void qmp_dispatch(QmpSession *session, QObject *request, bool allow_oob) -- 2.24.0