This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 1b30d0810ac0d985676c90dc1b6532f9fb84284c Author: Paul J. Davis <paul.joseph.da...@gmail.com> AuthorDate: Fri Nov 13 14:08:52 2020 -0600 Allow snapshots through fabric2_fdb:transactional --- src/fabric/src/fabric2_fdb.erl | 17 ++++ src/fabric/test/fabric2_snapshot_tests.erl | 134 +++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index aa2bade..1690f2f 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -18,6 +18,8 @@ transactional/3, transactional/2, + with_snapshot/2, + create/2, open/2, ensure_current/1, @@ -150,6 +152,11 @@ transactional(#{tx := undefined} = Db, Fun) -> transactional(Db#{reopen => true}, Fun) end) end; +transactional(#{tx := {erlfdb_snapshot, _}} = Db, Fun) -> + DbName = maps:get(name, Db, undefined), + with_span(Fun, #{'db.name' => DbName}, fun() -> + Fun(Db) + end); transactional(#{tx := {erlfdb_transaction, _}} = Db, Fun) -> DbName = maps:get(name, Db, undefined), @@ -183,6 +190,14 @@ do_transaction(Fun, LayerPrefix) when is_function(Fun, 1) -> end. +with_snapshot(#{tx := {erlfdb_transaction, _} = Tx} = TxDb, Fun) -> + SSDb = TxDb#{tx := erlfdb:snapshot(Tx)}, + Fun(SSDb); + +with_snapshot(#{tx := {erlfdb_snapshot, _}} = SSDb, Fun) -> + Fun(SSDb). + + create(#{} = Db0, Options) -> #{ name := DbName, @@ -1865,6 +1880,8 @@ get_db_handle() -> end. +require_transaction(#{tx := {erlfdb_snapshot, _}} = _Db) -> + ok; require_transaction(#{tx := {erlfdb_transaction, _}} = _Db) -> ok; require_transaction(#{} = _Db) -> diff --git a/src/fabric/test/fabric2_snapshot_tests.erl b/src/fabric/test/fabric2_snapshot_tests.erl new file mode 100644 index 0000000..37ba626 --- /dev/null +++ b/src/fabric/test/fabric2_snapshot_tests.erl @@ -0,0 +1,134 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric2_snapshot_tests). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("fabric2.hrl"). +-include("fabric2_test.hrl"). + + +fdb_ss_test_() -> + { + "Test snapshot usage", + setup, + fun setup/0, + fun cleanup/1, + with([ + ?TDEF(retry_without_snapshot), + ?TDEF(no_retry_with_snapshot) + ]) + }. + + +setup() -> + Ctx = test_util:start_couch([fabric]), + {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]), + {Db, Ctx}. + + +cleanup({Db, Ctx}) -> + ok = fabric2_db:delete(fabric2_db:name(Db), []), + test_util:stop_couch(Ctx). + + +retry_without_snapshot({Db, _}) -> + DbName = fabric2_db:name(Db), + put(retry_count, 0), + erase(conflict_pid), + InitDbSeq = fabric2_db:get_update_seq(Db), + DbSeq = fabric2_fdb:transactional(Db, fun(TxDb) -> + put(retry_count, get(retry_count) + 1), + + % Fetch the update_seq + Seq = fabric2_db:get_update_seq(TxDb), + + % Generate a no-op write so that we don't hit the + % optimization to skip commits on read-only + % transactions + bump_view_size(TxDb), + + % Generate a conflicting transaction while + % we're not yet committed + case get(conflict_pid) of + undefined -> + {Pid, Ref} = spawn_monitor(fun() -> generate_conflict(DbName) end), + receive {'DOWN', Ref, _, _, normal} -> ok end, + put(conflict_pid, Pid); + Pid when is_pid(Pid) -> + ok + end, + + Seq + end), + + ?assertEqual(2, get(retry_count)), + ?assertNotEqual(InitDbSeq, DbSeq). + + +no_retry_with_snapshot({Db, _}) -> + DbName = fabric2_db:name(Db), + put(retry_count, 0), + erase(conflict_pid), + InitDbSeq = fabric2_db:get_update_seq(Db), + DbSeq = fabric2_fdb:transactional(Db, fun(TxDb) -> + put(retry_count, get(retry_count) + 1), + + % Fetch the update_seq + Seq = fabric2_fdb:with_snapshot(TxDb, fun(SSDb) -> + fabric2_db:get_update_seq(SSDb) + end), + + % Generate a no-op write so that we don't hit the + % optimization to skip commits on read-only + % transactions + bump_view_size(TxDb), + + % Generate a conflicting transaction while + % we're not yet committed + case get(conflict_pid) of + undefined -> + {Pid, Ref} = spawn_monitor(fun() -> generate_conflict(DbName) end), + receive {'DOWN', Ref, _, _, normal} -> ok end, + put(conflict_pid, Pid); + Pid when is_pid(Pid) -> + ok + end, + + Seq + end), + + ?assertEqual(1, get(retry_count)), + ?assertEqual(InitDbSeq, DbSeq). + + +bump_view_size(TxDb) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, + + DbTuple = {?DB_STATS, <<"sizes">>, <<"views">>}, + DbKey = erlfdb_tuple:pack(DbTuple, DbPrefix), + erlfdb:add(Tx, DbKey, 0). + + +generate_conflict(DbName) -> + {ok, Db} = fabric2_db:open(DbName, [{user_ctx, ?ADMIN_USER}]), + Doc = #doc{ + id = fabric2_util:uuid(), + body = {[{<<"foo">>, <<"bar">>}]} + }, + {ok, _} = fabric2_db:update_doc(Db, Doc).