jaydoane commented on code in PR #5760:
URL: https://github.com/apache/couchdb/pull/5760#discussion_r2563043945
##########
src/mem3/test/eunit/mem3_rep_test.erl:
##########
@@ -173,6 +176,110 @@ replicate_with_purges(#{allsrc := AllSrc, alltgt :=
AllTgt}) ->
?assertEqual(#{}, SDocs),
?assertEqual(#{}, get_all_docs(AllTgt)).
+clean_purge_checkpoints(#{allsrc := AllSrc, alltgt := AllTgt}) ->
+ DocSpec = #{docs => 10, delete => [5, 9], purge => [2, 4]},
+ add_test_docs(AllSrc, DocSpec),
+ % Add and purge some docs on target to excercise the pull_purges code path
+ add_test_docs(AllTgt, #{docs => 3, purge => [0, 2]}),
+ [Src] = lists:sort(mem3:local_shards(AllSrc)),
+ [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)),
+ #shard{name = SrcName} = Src,
+
+ % Since we don't have multiple nodes running and are just replicating
+ % from one clustered db to another, we need to patch up the shard map
+ % during the replication so it looks targets are part of the shard maps
+ meck:expect(mem3, shards, fun(DbName) ->
+ case DbName == Src#shard.dbname of
+ true -> [Src, Tgt1, Tgt2];
+ false -> meck:passthrough([DbName])
+ end
+ end),
+
+ FakeTarget = '[email protected]',
+
+ % Add a mix of stale, invalid or deprecated purge checkpoints
+ [Uuid1, Uuid2, Uuid3] = [couch_uuids:random() || _ <- lists:seq(1, 3)],
+
+ CheckpointIds = couch_util:with_db(SrcName, fun(Db) ->
+ Uuid = couch_db:get_uuid(Db),
+ Docs = [
+ % This one is ok and should not be cleaned up
+ #doc{
+ id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid1/binary>>,
+ body =
+ {[
+ {<<"type">>, <<"internal_replicator">>},
+ {<<"updated_on">>, os:system_time(second)},
+ {<<"purge_seq">>, 10042},
+ {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+ {<<"target">>, atom_to_binary(Tgt1#shard.node,
latin1)},
+ {<<"range">>, Tgt1#shard.range}
+ ]}
+ },
+ % Non-existent range. Should be cleaned up.
+ #doc{
+ id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid2/binary>>,
+ body =
+ {[
+ {<<"type">>, <<"internal_replicator">>},
+ {<<"updated_on">>, os:system_time(second)},
+ {<<"purge_seq">>, 10043},
+ {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+ {<<"target">>, atom_to_binary(Tgt1#shard.node,
latin1)},
+ {<<"range">>, [0, 1]}
+ ]}
+ },
+ % Non-existent target. Shoudl be cleaned up.
Review Comment:
```suggestion
% Non-existent target. Should be cleaned up.
```
##########
src/mem3/src/mem3_rep.erl:
##########
@@ -166,44 +171,98 @@ local_id_hash(Thing) ->
couch_util:encodeBase64Url(couch_hash:md5_hash(?term_to_bin(Thing))).
make_purge_id(SourceUUID, TargetUUID) ->
- <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+ <<?PURGE_PREFIX, SourceUUID/binary, "-", TargetUUID/binary>>.
-verify_purge_checkpoint(DbName, Props) ->
- try
- Type = couch_util:get_value(<<"type">>, Props),
- if
- Type =/= <<"internal_replication">> ->
- false;
- true ->
- SourceBin = couch_util:get_value(<<"source">>, Props),
- TargetBin = couch_util:get_value(<<"target">>, Props),
- Range = couch_util:get_value(<<"range">>, Props),
+remote_id_to_local(<<?PURGE_PREFIX, Remote:?UUID_SIZE/binary, "-",
Local:?UUID_SIZE/binary>>) ->
+ <<?PURGE_PREFIX, Local/binary, "-", Remote/binary>>.
- Source = binary_to_existing_atom(SourceBin, latin1),
- Target = binary_to_existing_atom(TargetBin, latin1),
+% If the shard map changed, nodes are decomissioned, or user upgraded from a
+% version before 3.6 we may have some some checkpoints to clean up. Call this
+% function before compacting, right before we calculate the minimum purge
+% sequence, and also before we replicate purges to/from other copies.
+%
+cleanup_purge_checkpoints(ShardName) when is_binary(ShardName) ->
+ couch_util:with_db(ShardName, fun(Db) -> cleanup_purge_checkpoints(Db)
end);
+cleanup_purge_checkpoints(Db) ->
+ Shards = shards(couch_db:name(Db)),
+ UUID = couch_db:get_uuid(Db),
+ FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+ case Id of
+ <<?PURGE_PREFIX, UUID:?UUID_SIZE/binary, "-",
_:?UUID_SIZE/binary>> ->
+ case verify_checkpoint_shard(Shards, Props) of
+ true -> {ok, Acc};
+ false -> {ok, [Id | Acc]}
+ end;
+ <<?PURGE_PREFIX, _:?UUID_SIZE/binary, "-", _:?UUID_SIZE/binary>> ->
+ % Cleanup checkpoints not originating at the current shard.
+ % Previously, before version 3.6, during a pull from shard B to
+ % shard A we checkpointed on target B with doc ID
+ % mem3-purge-$AUuid-$BUuid. That created a redunant checkpoint
+ % which was the same as target B pushing changes to target A,
+ % which already had a checkpoint: mem3-purge-$BUuid-$AUuid,
+ % with the same direction and same purge sequence ID. So here
+ % we remove those reduntant checkpoints.
Review Comment:
```suggestion
% we remove those redundant checkpoints.
```
##########
src/mem3/src/mem3_rep.erl:
##########
@@ -166,44 +171,98 @@ local_id_hash(Thing) ->
couch_util:encodeBase64Url(couch_hash:md5_hash(?term_to_bin(Thing))).
make_purge_id(SourceUUID, TargetUUID) ->
- <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>.
+ <<?PURGE_PREFIX, SourceUUID/binary, "-", TargetUUID/binary>>.
-verify_purge_checkpoint(DbName, Props) ->
- try
- Type = couch_util:get_value(<<"type">>, Props),
- if
- Type =/= <<"internal_replication">> ->
- false;
- true ->
- SourceBin = couch_util:get_value(<<"source">>, Props),
- TargetBin = couch_util:get_value(<<"target">>, Props),
- Range = couch_util:get_value(<<"range">>, Props),
+remote_id_to_local(<<?PURGE_PREFIX, Remote:?UUID_SIZE/binary, "-",
Local:?UUID_SIZE/binary>>) ->
+ <<?PURGE_PREFIX, Local/binary, "-", Remote/binary>>.
- Source = binary_to_existing_atom(SourceBin, latin1),
- Target = binary_to_existing_atom(TargetBin, latin1),
+% If the shard map changed, nodes are decomissioned, or user upgraded from a
+% version before 3.6 we may have some some checkpoints to clean up. Call this
+% function before compacting, right before we calculate the minimum purge
+% sequence, and also before we replicate purges to/from other copies.
+%
+cleanup_purge_checkpoints(ShardName) when is_binary(ShardName) ->
+ couch_util:with_db(ShardName, fun(Db) -> cleanup_purge_checkpoints(Db)
end);
+cleanup_purge_checkpoints(Db) ->
+ Shards = shards(couch_db:name(Db)),
+ UUID = couch_db:get_uuid(Db),
+ FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) ->
+ case Id of
+ <<?PURGE_PREFIX, UUID:?UUID_SIZE/binary, "-",
_:?UUID_SIZE/binary>> ->
+ case verify_checkpoint_shard(Shards, Props) of
+ true -> {ok, Acc};
+ false -> {ok, [Id | Acc]}
+ end;
+ <<?PURGE_PREFIX, _:?UUID_SIZE/binary, "-", _:?UUID_SIZE/binary>> ->
+ % Cleanup checkpoints not originating at the current shard.
+ % Previously, before version 3.6, during a pull from shard B to
+ % shard A we checkpointed on target B with doc ID
+ % mem3-purge-$AUuid-$BUuid. That created a redunant checkpoint
Review Comment:
```suggestion
% mem3-purge-$AUuid-$BUuid. That created a redundant
checkpoint
```
##########
src/mem3/test/eunit/mem3_rep_test.erl:
##########
@@ -173,6 +176,110 @@ replicate_with_purges(#{allsrc := AllSrc, alltgt :=
AllTgt}) ->
?assertEqual(#{}, SDocs),
?assertEqual(#{}, get_all_docs(AllTgt)).
+clean_purge_checkpoints(#{allsrc := AllSrc, alltgt := AllTgt}) ->
+ DocSpec = #{docs => 10, delete => [5, 9], purge => [2, 4]},
+ add_test_docs(AllSrc, DocSpec),
+ % Add and purge some docs on target to excercise the pull_purges code path
+ add_test_docs(AllTgt, #{docs => 3, purge => [0, 2]}),
+ [Src] = lists:sort(mem3:local_shards(AllSrc)),
+ [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)),
+ #shard{name = SrcName} = Src,
+
+ % Since we don't have multiple nodes running and are just replicating
+ % from one clustered db to another, we need to patch up the shard map
+ % during the replication so it looks targets are part of the shard maps
+ meck:expect(mem3, shards, fun(DbName) ->
+ case DbName == Src#shard.dbname of
+ true -> [Src, Tgt1, Tgt2];
+ false -> meck:passthrough([DbName])
+ end
+ end),
+
+ FakeTarget = '[email protected]',
+
+ % Add a mix of stale, invalid or deprecated purge checkpoints
+ [Uuid1, Uuid2, Uuid3] = [couch_uuids:random() || _ <- lists:seq(1, 3)],
+
+ CheckpointIds = couch_util:with_db(SrcName, fun(Db) ->
+ Uuid = couch_db:get_uuid(Db),
+ Docs = [
+ % This one is ok and should not be cleaned up
+ #doc{
+ id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid1/binary>>,
+ body =
+ {[
+ {<<"type">>, <<"internal_replicator">>},
+ {<<"updated_on">>, os:system_time(second)},
+ {<<"purge_seq">>, 10042},
+ {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+ {<<"target">>, atom_to_binary(Tgt1#shard.node,
latin1)},
+ {<<"range">>, Tgt1#shard.range}
+ ]}
+ },
+ % Non-existent range. Should be cleaned up.
+ #doc{
+ id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid2/binary>>,
+ body =
+ {[
+ {<<"type">>, <<"internal_replicator">>},
+ {<<"updated_on">>, os:system_time(second)},
+ {<<"purge_seq">>, 10043},
+ {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+ {<<"target">>, atom_to_binary(Tgt1#shard.node,
latin1)},
+ {<<"range">>, [0, 1]}
+ ]}
+ },
+ % Non-existent target. Shoudl be cleaned up.
+ #doc{
+ id = <<"_local/purge-mem3-", Uuid/binary, "-", Uuid3/binary>>,
+ body =
+ {[
+ {<<"type">>, <<"internal_replicator">>},
+ {<<"updated_on">>, os:system_time(second)},
+ {<<"purge_seq">>, 10044},
+ {<<"source">>, atom_to_binary(Src#shard.node, latin1)},
+ {<<"target">>, atom_to_binary(FakeTarget, latin1)},
+ {<<"range">>, Tgt1#shard.range}
+ ]}
+ },
+ % Deprecated checkpoint format. Should be cleaned up.
Review Comment:
Can you clarify how the format is deprecated?
##########
src/mem3/test/eunit/mem3_rep_test.erl:
##########
@@ -173,6 +176,110 @@ replicate_with_purges(#{allsrc := AllSrc, alltgt :=
AllTgt}) ->
?assertEqual(#{}, SDocs),
?assertEqual(#{}, get_all_docs(AllTgt)).
+clean_purge_checkpoints(#{allsrc := AllSrc, alltgt := AllTgt}) ->
+ DocSpec = #{docs => 10, delete => [5, 9], purge => [2, 4]},
+ add_test_docs(AllSrc, DocSpec),
+ % Add and purge some docs on target to excercise the pull_purges code path
+ add_test_docs(AllTgt, #{docs => 3, purge => [0, 2]}),
+ [Src] = lists:sort(mem3:local_shards(AllSrc)),
+ [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)),
+ #shard{name = SrcName} = Src,
+
+ % Since we don't have multiple nodes running and are just replicating
+ % from one clustered db to another, we need to patch up the shard map
+ % during the replication so it looks targets are part of the shard maps
Review Comment:
```suggestion
% during the replication so it looks like targets are part of the shard
maps
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]