This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch 3.3.x in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit e7cfbb2ccfbe415428a379e9156d0aa6d50924a6 Author: Nick Vatamaniuc <vatam...@gmail.com> AuthorDate: Mon Aug 7 12:18:28 2023 -0400 Fix badmatch error when purge requests time out Previously, if some workers responded, and some timed-out, fabric_doc_purge would throw a badmatch error, as it mistakenly expected the set of worker_uuids to match the initial set of worker_uuids. To prevent future accidental match errors, and simplify testing, move timeout handling to a separate function. This also made it possible to shorten the body of the go/3 function a bit. Since we're adding a new test, take the chance to use the newer `?TDEF` macro instead of the awkward `?_test(begin ...end)` construct. --- src/fabric/src/fabric_doc_purge.erl | 591 ++++++++++++++++++------------------ 1 file changed, 293 insertions(+), 298 deletions(-) diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl index 082666afb..5405ceb60 100644 --- a/src/fabric/src/fabric_doc_purge.erl +++ b/src/fabric/src/fabric_doc_purge.erl @@ -16,7 +16,6 @@ go/3 ]). --include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -record(acc, { @@ -68,30 +67,12 @@ go(DbName, IdsRevs, Options) -> uuid_counts = UUIDCounts, w = w(DbName, Options) }, + Callback = fun handle_message/3, Acc2 = - try - rexi_utils:recv( - Workers, - #shard.ref, - fun handle_message/3, - Acc0, - infinity, - Timeout - ) - of - {ok, Acc1} -> - Acc1; - {timeout, Acc1} -> - #acc{ - worker_uuids = WorkerUUIDs, - resps = Resps - } = Acc1, - DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs], - fabric_util:log_timeout(DefunctWorkers, "purge_docs"), - NewResps = append_errors(timeout, WorkerUUIDs, Resps), - Acc1#acc{worker_uuids = [], resps = NewResps}; - Else -> - Else + try rexi_utils:recv(Workers, #shard.ref, Callback, Acc0, infinity, Timeout) of + {ok, Acc1} -> Acc1; + {timeout, Acc1} -> handle_timeout(Acc1); + Else -> Else after rexi_monitor:stop(RexiMon) end, @@ -127,6 +108,12 @@ handle_message({ok, Replies}, Worker, Acc) -> handle_message({bad_request, Msg}, _, _) -> throw({bad_request, Msg}). +handle_timeout(#acc{worker_uuids = DefunctWorkerUUIDs, resps = Resps} = Acc) -> + DefunctWorkers = [Worker || {Worker, _} <- DefunctWorkerUUIDs], + fabric_util:log_timeout(DefunctWorkers, "purge_docs"), + NewResps = append_errors(timeout, DefunctWorkerUUIDs, Resps), + Acc#acc{worker_uuids = [], resps = NewResps}. + create_reqs([], UUIDs, Reqs) -> {lists:reverse(UUIDs), lists:reverse(Reqs)}; create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) -> @@ -253,303 +240,311 @@ has_quorum(Resps, Count, W) -> end. -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). purge_test_() -> { setup, fun setup/0, fun teardown/1, - [ - t_w2_ok(), - t_w3_ok(), + with([ + ?TDEF(t_w2_ok), + ?TDEF(t_w3_ok), - t_w2_mixed_accepted(), - t_w3_mixed_accepted(), + ?TDEF(t_w2_mixed_accepted), + ?TDEF(t_w3_mixed_accepted), - t_w2_exit1_ok(), - t_w2_exit2_accepted(), - t_w2_exit3_error(), + ?TDEF(t_w2_exit1_ok), + ?TDEF(t_w2_exit2_accepted), + ?TDEF(t_w2_exit3_error), - t_w4_accepted(), + ?TDEF(t_w4_accepted), - t_mixed_ok_accepted(), - t_mixed_errors() - ] + ?TDEF(t_mixed_ok_accepted), + ?TDEF(t_mixed_errors), + ?TDEF(t_timeout) + ]) }. setup() -> meck:new(couch_log), meck:expect(couch_log, warning, fun(_, _) -> ok end), - meck:expect(couch_log, notice, fun(_, _) -> ok end). + meck:expect(couch_log, notice, fun(_, _) -> ok end), + meck:expect(couch_log, error, fun(_, _) -> ok end). teardown(_) -> meck:unload(). -t_w2_ok() -> - ?_test(begin - Acc0 = create_init_acc(2), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, true), - - Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), - ?assertEqual(Expect, Resps), - ?assertEqual(ok, resp_health(Resps)) - end). - -t_w3_ok() -> - ?_test(begin - Acc0 = create_init_acc(3), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(ok, resp_health(Resps)) - end). - -t_w2_mixed_accepted() -> - ?_test(begin - Acc0 = create_init_acc(2), - Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, - Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, - - {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [ - {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, - {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} - ], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - -t_w3_mixed_accepted() -> - ?_test(begin - Acc0 = create_init_acc(3), - Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, - Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, - - {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [ - {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, - {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} - ], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - -t_w2_exit1_ok() -> - ?_test(begin - Acc0 = create_init_acc(2), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(ok, resp_health(Resps)) - end). - -t_w2_exit2_accepted() -> - ?_test(begin - Acc0 = create_init_acc(2), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - -t_w2_exit3_error() -> - ?_test(begin - Acc0 = create_init_acc(2), - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [ - {error, internal_server_error}, - {error, internal_server_error} - ], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(error, resp_health(Resps)) - end). - -t_w4_accepted() -> +t_w2_ok(_) -> + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)). + +t_w3_ok(_) -> + Acc0 = create_init_acc(3), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)). + +t_w2_mixed_accepted(_) -> + Acc0 = create_init_acc(2), + Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, + Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, + {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)). + +t_w3_mixed_accepted(_) -> + Acc0 = create_init_acc(3), + Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, + Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, + {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)). + +t_w2_exit1_ok(_) -> + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)). + +t_w2_exit2_accepted(_) -> + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)). + +t_w2_exit3_error(_) -> + Acc0 = create_init_acc(2), + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {error, internal_server_error}, + {error, internal_server_error} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)). + +t_w4_accepted(_) -> % Make sure we return when all workers have responded % rather than wait around for a timeout if a user asks % for a qourum with more than the available number of % shards. - ?_test(begin - Acc0 = create_init_acc(4), - Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - ?assertEqual(2, length(Acc1#acc.worker_uuids)), - check_quorum(Acc1, false), - - {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - ?assertEqual(1, length(Acc2#acc.worker_uuids)), - check_quorum(Acc2, false), - - {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), - ?assertEqual(0, length(Acc3#acc.worker_uuids)), - check_quorum(Acc3, true), - - Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - -t_mixed_ok_accepted() -> - ?_test(begin - WorkerUUIDs = [ - {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, - - {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} - ], - - Acc0 = #acc{ - worker_uuids = WorkerUUIDs, - resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), - uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), - w = 2 - }, - - Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]}, - Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]}, - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), - {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1), - {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), - {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), - {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4), - - Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), - ?assertEqual(Expect, Resps), - ?assertEqual(accepted, resp_health(Resps)) - end). - -t_mixed_errors() -> - ?_test(begin - WorkerUUIDs = [ - {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, - {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, - - {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, - {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} - ], - - Acc0 = #acc{ - worker_uuids = WorkerUUIDs, - resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), - uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), - w = 2 - }, - - Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, - ExitMsg = {rexi_EXIT, blargh}, - - {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), - {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), - {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), - {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), - {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4), - - Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}], - Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), - ?assertEqual(Expect, Resps), - ?assertEqual(error, resp_health(Resps)) - end). + Acc0 = create_init_acc(4), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)). + +t_mixed_ok_accepted(_) -> + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]}, + Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), + {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4), + + Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)). + +t_mixed_errors(_) -> + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), + {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4), + + Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)). + +t_timeout(_) -> + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + Acc4 = handle_timeout(Acc3), + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc4), + ?assertEqual([{ok, [{1, <<"foo">>}]}, {error, timeout}], Resps). create_init_acc(W) -> UUID1 = <<"uuid1">>,