This patch modifies thread pool to allow replaying asynchronous thread tasks synchronously in replay mode.
Signed-off-by: Pavel Dovgalyuk <pavel.dovga...@ispras.ru> --- block/raw-posix.c | 6 ++++- block/raw-win32.c | 4 +++- include/block/thread-pool.h | 4 +++- replay/replay-events.c | 11 ++++++++++ replay/replay-internal.h | 1 + replay/replay.h | 2 ++ stubs/replay.c | 4 ++++ tests/test-thread-pool.c | 7 ++++-- thread-pool.c | 49 ++++++++++++++++++++++++++++++------------- 9 files changed, 66 insertions(+), 22 deletions(-) diff --git a/block/raw-posix.c b/block/raw-posix.c index e51293a..f878e06 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -1073,7 +1073,9 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, int fd, trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, + qiov ? qiov->replay : false, + qiov ? qiov->replay_step : 0); } static BlockAIOCB *raw_aio_submit(BlockDriverState *bs, @@ -1986,7 +1988,7 @@ static BlockAIOCB *hdev_aio_ioctl(BlockDriverState *bs, acb->aio_ioctl_buf = buf; acb->aio_ioctl_cmd = req; pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, false, 0); } #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) diff --git a/block/raw-win32.c b/block/raw-win32.c index 06243d7..7ac693b 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -158,7 +158,9 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, + qiov ? qiov->replay : false, + qiov ? qiov->replay_step : 0); } int qemu_ftruncate64(int fd, int64_t length) diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 42eb5e8..801ac00 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -29,9 +29,11 @@ void thread_pool_free(ThreadPool *pool); BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, ThreadPoolFunc *func, void *arg, - BlockCompletionFunc *cb, void *opaque); + BlockCompletionFunc *cb, void *opaque, + bool replay, uint64_t replay_step); int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, void *arg); void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); +void thread_pool_work(ThreadPool *pool, void *r); #endif diff --git a/replay/replay-events.c b/replay/replay-events.c index 7e82b61..f4ec702 100755 --- a/replay/replay-events.c +++ b/replay/replay-events.c @@ -12,6 +12,7 @@ #include "qemu-common.h" #include "replay.h" #include "replay-internal.h" +#include "block/thread-pool.h" typedef struct Event { ReplayAsyncEventKind event_kind; @@ -39,6 +40,9 @@ static void replay_run_event(Event *event) case REPLAY_ASYNC_EVENT_BH: aio_bh_call(event->opaque); break; + case REPLAY_ASYNC_EVENT_THREAD: + thread_pool_work((ThreadPool *)event->opaque, event->opaque2); + break; default: fprintf(stderr, "Replay: invalid async event ID (%d) in the queue\n", event->event_kind); @@ -130,6 +134,11 @@ void replay_add_bh_event(void *bh, uint64_t id) replay_add_event_internal(REPLAY_ASYNC_EVENT_BH, bh, NULL, id); } +void replay_add_thread_event(void *opaque, void *opaque2, uint64_t id) +{ + replay_add_event_internal(REPLAY_ASYNC_EVENT_THREAD, opaque, opaque2, id); +} + /* Called with replay mutex locked */ void replay_save_events(int opt) { @@ -145,6 +154,7 @@ void replay_save_events(int opt) /* save event-specific data */ switch (event->event_kind) { case REPLAY_ASYNC_EVENT_BH: + case REPLAY_ASYNC_EVENT_THREAD: replay_put_qword(event->id); break; } @@ -179,6 +189,7 @@ void replay_read_events(int opt) /* Execute some events without searching them in the queue */ switch (read_event_kind) { case REPLAY_ASYNC_EVENT_BH: + case REPLAY_ASYNC_EVENT_THREAD: if (read_id == -1) { read_id = replay_get_qword(); } diff --git a/replay/replay-internal.h b/replay/replay-internal.h index 0e6ba2a..4d242aa 100755 --- a/replay/replay-internal.h +++ b/replay/replay-internal.h @@ -41,6 +41,7 @@ enum ReplayEvents { enum ReplayAsyncEventKind { REPLAY_ASYNC_EVENT_BH, + REPLAY_ASYNC_EVENT_THREAD, REPLAY_ASYNC_COUNT }; diff --git a/replay/replay.h b/replay/replay.h index 073e105..3110d46 100755 --- a/replay/replay.h +++ b/replay/replay.h @@ -108,5 +108,7 @@ bool replay_checkpoint(ReplayCheckpoint checkpoint); void replay_disable_events(void); /*! Adds BH event to the queue */ void replay_add_bh_event(void *bh, uint64_t id); +/*! Adds thread event to the queue */ +void replay_add_thread_event(void *pool, void *req, uint64_t id); #endif diff --git a/stubs/replay.c b/stubs/replay.c index 95b43f3..81eddae 100755 --- a/stubs/replay.c +++ b/stubs/replay.c @@ -30,3 +30,7 @@ uint64_t replay_get_current_step(void) { return 0; } + +void replay_add_thread_event(void *opaque, void *opaque2, uint64_t id) +{ +} diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c index 6a0b981..f32594c 100644 --- a/tests/test-thread-pool.c +++ b/tests/test-thread-pool.c @@ -56,7 +56,7 @@ static void test_submit_aio(void) { WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, - done_cb, &data); + done_cb, &data, false, 0); /* The callbacks are not called until after the first wait. */ active = 1; @@ -120,7 +120,8 @@ static void test_submit_many(void) for (i = 0; i < 100; i++) { data[i].n = 0; data[i].ret = -EINPROGRESS; - thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); + thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i], + false, 0); } active = 100; @@ -149,7 +150,7 @@ static void do_test_cancel(bool sync) data[i].n = 0; data[i].ret = -EINPROGRESS; data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], - done_cb, &data[i]); + done_cb, &data[i], false, 0); } /* Starting the threads may be left to a bottom half. Let it diff --git a/thread-pool.c b/thread-pool.c index e2cac8e..f5a4dac 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -22,6 +22,7 @@ #include "trace.h" #include "block/thread-pool.h" #include "qemu/main-loop.h" +#include "replay/replay.h" static void do_spawn_thread(ThreadPool *pool); @@ -74,6 +75,27 @@ struct ThreadPool { bool stopping; }; +void thread_pool_work(ThreadPool *pool, void *r) +{ + ThreadPoolElement *req = (ThreadPoolElement *)r; + int ret; + if (replay_mode == REPLAY_MODE_NONE) { + qemu_mutex_unlock(&pool->lock); + } + + ret = req->func(req->arg); + req->ret = ret; + /* Write ret before state. */ + smp_wmb(); + req->state = THREAD_DONE; + + if (replay_mode == REPLAY_MODE_NONE) { + qemu_mutex_lock(&pool->lock); + } + + qemu_bh_schedule(pool->completion_bh); +} + static void *worker_thread(void *opaque) { ThreadPool *pool = opaque; @@ -100,18 +122,12 @@ static void *worker_thread(void *opaque) req = QTAILQ_FIRST(&pool->request_list); QTAILQ_REMOVE(&pool->request_list, req, reqs); req->state = THREAD_ACTIVE; - qemu_mutex_unlock(&pool->lock); - - ret = req->func(req->arg); - - req->ret = ret; - /* Write ret before state. */ - smp_wmb(); - req->state = THREAD_DONE; - - qemu_mutex_lock(&pool->lock); - qemu_bh_schedule(pool->completion_bh); + if (replay_mode != REPLAY_MODE_NONE && req->common.replay) { + replay_add_thread_event(pool, req, req->common.replay_step); + } else { + thread_pool_work(pool, req); + } } pool->cur_threads--; @@ -235,7 +251,8 @@ static const AIOCBInfo thread_pool_aiocb_info = { BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, ThreadPoolFunc *func, void *arg, - BlockCompletionFunc *cb, void *opaque) + BlockCompletionFunc *cb, void *opaque, + bool replay, uint64_t replay_step) { ThreadPoolElement *req; @@ -244,6 +261,8 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, req->arg = arg; req->state = THREAD_QUEUED; req->pool = pool; + req->common.replay = replay; + req->common.replay_step = replay_step; QLIST_INSERT_HEAD(&pool->head, req, all); @@ -254,8 +273,8 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool, spawn_thread(pool); } QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); - qemu_mutex_unlock(&pool->lock); qemu_sem_post(&pool->sem); + qemu_mutex_unlock(&pool->lock); return &req->common; } @@ -277,14 +296,14 @@ int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, { ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; assert(qemu_in_coroutine()); - thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc, false, 0); qemu_coroutine_yield(); return tpc.ret; } void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) { - thread_pool_submit_aio(pool, func, arg, NULL, NULL); + thread_pool_submit_aio(pool, func, arg, NULL, NULL, false, 0); } static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)