Hi I am new to erlang and riak. I started to use riak as a kv store couple of months ago. Now i want to implement a commit hook to riak so that riak could help me to make some statistics. i read some docs and write a pre-hook scripts, which will fetch the object key and store it into a set. This hook works fine if there is only one client write to riak, but if i increase the connection to riak writing, i found it lost some elements in the set. Looks like the crdt_op did not do the merge operation.And there is no obvious error in the log files.
Could someone help me to finger out what happened or what i has missed. i am using the riak 2.1.3 Thanks all! Here is the hook scripts: ------------------------------------------------------------ ------------------------------------------ -module(myhook). -export([pretest/1]). now_to_local_string({MegaSecs, Secs, MicroSecs}) -> LocalTime = calendar:now_to_local_time({MegaSecs, Secs, MicroSecs}), {{Year, Month, Day}, {Hour, Minute, _}} = LocalTime, TimeStr = lists:flatten(io_lib:format("~4..0w~2..0w~2..0w~2..0w~2..0w", [Year, Month, Day, Hour, Minute])), TimeStr. is_deleted(Object)-> case dict:find(<<"X-Riak-Deleted">>,riak_object:get_metadata(Object)) of {ok,_} -> true; _ -> false end. pretest(Object) -> % timer:sleep(10000), try ObjBucket = riak_object:bucket(Object), % riak_object:bucket(Obj). % {<<"cn-archive">>,<<"local-test">>} Bucket = element(2, ObjBucket), BucketType = element(1, ObjBucket), ObjKey = riak_object:key(Object), % Key = binary_to_list(ObjKey), % ObjData = riak_object:get_value(Object), % Msg = binary_to_list(ObjData), CommitItem = iolist_to_binary(mochijson2:encode({struct, [{b, Bucket}, {k, ObjKey}, {t, BucketType}]})), case is_deleted(Object) of true -> KeyPrefix = "delete"; _ -> KeyPrefix = "update" end, CurMin = now_to_local_string(os:timestamp()), IndexKey = binary:list_to_bin(io_lib:format("~s-~s", [CurMin, KeyPrefix])), %% Get a riak client {ok, C} = riak:local_client(), % get node obj ThisNode = atom_to_binary(node(), latin1), % get index obj and set context BType = <<"archive">>, B = <<"local-test">>, {SetObj, Context} = case C:get({BType, B}, IndexKey) of {error, notfound} -> ThisSetObj = riak_kv_crdt:new({BType, B}, IndexKey, riak_dt_orswot), {ThisSetObj, undefined}; {ok, ThisSetObj} -> % The datatype update requires the context if the value exists {{Ctx, _}, _} = riak_kv_crdt:value(ThisSetObj, riak_dt_orswot), {ThisSetObj, Ctx} end, UpdateIndex = [{add, CommitItem}], % UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, Context}, UpdateOp = {crdt_op, riak_dt_orswot, {update, UpdateIndex}, undefined}, NewObj = riak_kv_crdt:update(SetObj, ThisNode, UpdateOp), error_logger:info_msg("Updating index for ~s,to set ~s~n", [binary:bin_to_list(CommitItem), IndexKey]), C:put(NewObj), Object catch error:Error -> {fail, lists:flatten(io_lib:format("[PREHOOKEXCEPTION]~p",[Error]))} end. ------------------------------------------------------------ ------------------------------------------ This is the set bucket props ------------------------------------------------------------ ------------------------------------------ active: true allow_mult: true basic_quorum: false big_vclock: 50 chash_keyfun: {riak_core_util,chash_std_keyfun} claimant: 'riak@192.168.100.2' datatype: set dvv_enabled: true dw: quorum last_write_wins: false linkfun: {modfun,riak_kv_wm_link_walker,mapreduce_linkfun} n_val: 3 notfound_ok: true old_vclock: 86400 postcommit: [] pr: 0 precommit: [] pw: 0 r: quorum rw: quorum small_vclock: 50 w: quorum young_vclock: 20
_______________________________________________ riak-users mailing list riak-users@lists.basho.com http://lists.basho.com/mailman/listinfo/riak-users_lists.basho.com